177 lines
6.1 KiB
Python
177 lines
6.1 KiB
Python
# -*- coding:utf-8 -*-
|
||
import binascii
|
||
import hashlib
|
||
from paho.mqtt import client as mqtt
|
||
import base64
|
||
import json
|
||
import struct
|
||
import time
|
||
import redis
|
||
from paho.mqtt.client import ssl
|
||
|
||
import config
|
||
from mysql_obj import mysql_obj
|
||
|
||
# mqtt客户端
|
||
class mqtt_client:
|
||
|
||
is_debug = False
|
||
|
||
redis_obj = None
|
||
|
||
mqtt_obj = None
|
||
|
||
redis_name_mqtt_config = "api:cxaibc:mqtt:config"
|
||
|
||
# 球车数据键
|
||
redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"
|
||
|
||
def on_connect(self, client, userdata, flags, rc):
|
||
"""一旦连接成功, 回调此方法"""
|
||
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
|
||
if rc != 0:
|
||
raise Exception(rc_status[rc])
|
||
self.printf("mqtt: ", rc_status[rc], client.topic_id)
|
||
self.mqtt_obj.publish(client.topic_id, client.send_data)
|
||
self.printf("发送成功")
|
||
pass
|
||
|
||
"""
|
||
连接redis
|
||
"""
|
||
def redis_connect(self):
|
||
try:
|
||
print('链接redis')
|
||
self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True)
|
||
self.redis_obj.ping()
|
||
except redis.ConnectionError as e:
|
||
print('重新链接redis')
|
||
time.sleep(1)
|
||
return self.redis_connect()
|
||
pass
|
||
|
||
"""
|
||
发布
|
||
"""
|
||
def publish(self):
|
||
|
||
while True:
|
||
self.sendDetail("33333")
|
||
time.sleep(11)
|
||
continue
|
||
print("3333")
|
||
temp_data = None
|
||
with mysql_obj() as db:
|
||
sql = """
|
||
select ball_number from cx_ball_cart
|
||
"""
|
||
db.cursor.execute(sql)
|
||
res = db.cursor.fetchall()
|
||
if len(res) != 0:
|
||
temp_data = res
|
||
if temp_data is None:
|
||
time.sleep(60)
|
||
continue
|
||
try:
|
||
for val in temp_data:
|
||
get = self.redis_obj.get("{}{}_{}".format(self.redis_name_mqtt_qc_data, val['ball_number'], "Detail"))
|
||
if get is None:
|
||
self.sendDetail(val['ball_number'])
|
||
continue
|
||
json_de = json.loads(get)
|
||
if json_de['time'] <= time.time() - 60 * 3:
|
||
self.sendDetail(val['ball_number'])
|
||
continue
|
||
pass
|
||
except BaseException as e:
|
||
self.printf("执行数据失败:{}".format(e))
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
pass
|
||
# 获取redis中detail的数据
|
||
time.sleep(60*3)
|
||
pass
|
||
|
||
"""
|
||
发送detail数据
|
||
"""
|
||
def sendDetail(self, ball_cart_id):
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
str_res += struct.pack('i', 1)
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
str_res += binascii.a2b_hex(hex(x_o_y).encode()[2:])
|
||
self.mqtt_obj.publish("{}/Query".format(ball_cart_id), str_res)
|
||
print('ssss')
|
||
except BaseException as e:
|
||
self.printf("发送detail数据失败:{}".format(e))
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
pass
|
||
pass
|
||
|
||
|
||
"""
|
||
获取异或最后值
|
||
"""
|
||
def return_x_o_r(self, data):
|
||
t = None
|
||
for i in range(len(data)):
|
||
if i:
|
||
t ^= data[i]
|
||
else:
|
||
t = data[i] ^ 0
|
||
return t
|
||
|
||
"""
|
||
执行体
|
||
"""
|
||
def server_main(self):
|
||
while True:
|
||
try:
|
||
# if self.redis_obj is None:
|
||
# self.redis_connect()
|
||
# get = self.redis_obj.get(self.redis_name_mqtt_config)
|
||
# if get is not None:
|
||
# json_de = json.loads(get)
|
||
# if json_de is not None:
|
||
# config.mqtt_server = json_de['mqtt_server']
|
||
# config.mqtt_port = int(json_de['mqtt_port'])
|
||
# config.mqtt_user = json_de['mqtt_user']
|
||
# config.mqtt_pass = json_de['mqtt_pass']
|
||
|
||
|
||
config.mqtt_server = "mqtts.heclouds.com"
|
||
config.mqtt_port = 8883
|
||
config.mqtt_user = "570670"
|
||
config.mqtt_pass = "version=2018-10-31&res=products%2F570670%2Fdevices%2Ft001&et=1675424266&method=sha1&sign=%2B%2FCnauc2Pr73KHu1gUtMfJ2VgVg%3D"
|
||
|
||
self.is_debug = config.is_debug
|
||
client_id = "test"
|
||
self.mqtt_obj = mqtt.Client(client_id, transport='tcp')
|
||
self.mqtt_obj.username_pw_set(config.mqtt_user, config.mqtt_pass) # mqtt服务器账号密码
|
||
# self.mqtt_obj.tls_set(r"C:\Users\Administrator\Downloads\MQTTS-certificate.pem", tls_version=ssl.PROTOCOL_TLSv1_2)
|
||
# self.mqtt_obj.tls_insecure_set(True)
|
||
self.mqtt_obj.connect(config.mqtt_server, config.mqtt_port, 60) # 连接
|
||
self.mqtt_obj.loop_start()
|
||
self.publish()
|
||
except BaseException as e:
|
||
self.printf("mqtt 启动失败:{}".format(e))
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
time.sleep(1)
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
打印信息
|
||
"""
|
||
def printf(self, *data):
|
||
if self.is_debug is True:
|
||
print(data, time.time())
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
mqtt_client().server_main()
|
||
pass |