# -*- coding:utf-8 -*- import binascii import hashlib from paho.mqtt import client as mqtt import base64 import json import struct import time import redis from paho.mqtt.client import ssl import config from mysql_obj import mysql_obj # mqtt客户端 class mqtt_client: is_debug = False redis_obj = None mqtt_obj = None redis_name_mqtt_config = "api:cxaibc:mqtt:config" # 球车数据键 redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:" def on_connect(self, client, userdata, flags, rc): """一旦连接成功, 回调此方法""" rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"] if rc != 0: raise Exception(rc_status[rc]) self.printf("mqtt: ", rc_status[rc], client.topic_id) self.mqtt_obj.publish(client.topic_id, client.send_data) self.printf("发送成功") pass """ 连接redis """ def redis_connect(self): try: print('链接redis') self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True) self.redis_obj.ping() except redis.ConnectionError as e: print('重新链接redis') time.sleep(1) return self.redis_connect() pass """ 发布 """ def publish(self): while True: self.sendDetail("33333") time.sleep(11) continue print("3333") temp_data = None with mysql_obj() as db: sql = """ select ball_number from cx_ball_cart """ db.cursor.execute(sql) res = db.cursor.fetchall() if len(res) != 0: temp_data = res if temp_data is None: time.sleep(60) continue try: for val in temp_data: get = self.redis_obj.get("{}{}_{}".format(self.redis_name_mqtt_qc_data, val['ball_number'], "Detail")) if get is None: self.sendDetail(val['ball_number']) continue json_de = json.loads(get) if json_de['time'] <= time.time() - 60 * 3: self.sendDetail(val['ball_number']) continue pass except BaseException as e: self.printf("执行数据失败:{}".format(e)) self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件 self.printf(e.__traceback__.tb_lineno) pass # 获取redis中detail的数据 time.sleep(60*3) pass """ 发送detail数据 """ def sendDetail(self, ball_cart_id): try: time_base = int(time.time()) str_res = struct.pack('i', time_base) str_res += struct.pack('i', 1) x_o_y = self.return_x_o_r(str_res) str_res += binascii.a2b_hex(hex(x_o_y).encode()[2:]) self.mqtt_obj.publish("{}/Query".format(ball_cart_id), str_res) print('ssss') except BaseException as e: self.printf("发送detail数据失败:{}".format(e)) self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件 self.printf(e.__traceback__.tb_lineno) pass pass """ 获取异或最后值 """ def return_x_o_r(self, data): t = None for i in range(len(data)): if i: t ^= data[i] else: t = data[i] ^ 0 return t """ 执行体 """ def server_main(self): while True: try: # if self.redis_obj is None: # self.redis_connect() # get = self.redis_obj.get(self.redis_name_mqtt_config) # if get is not None: # json_de = json.loads(get) # if json_de is not None: # config.mqtt_server = json_de['mqtt_server'] # config.mqtt_port = int(json_de['mqtt_port']) # config.mqtt_user = json_de['mqtt_user'] # config.mqtt_pass = json_de['mqtt_pass'] config.mqtt_server = "mqtts.heclouds.com" config.mqtt_port = 8883 config.mqtt_user = "570670" config.mqtt_pass = "version=2018-10-31&res=products%2F570670%2Fdevices%2Ft001&et=1675424266&method=sha1&sign=%2B%2FCnauc2Pr73KHu1gUtMfJ2VgVg%3D" self.is_debug = config.is_debug client_id = "test" self.mqtt_obj = mqtt.Client(client_id, transport='tcp') self.mqtt_obj.username_pw_set(config.mqtt_user, config.mqtt_pass) # mqtt服务器账号密码 # self.mqtt_obj.tls_set(r"C:\Users\Administrator\Downloads\MQTTS-certificate.pem", tls_version=ssl.PROTOCOL_TLSv1_2) # self.mqtt_obj.tls_insecure_set(True) self.mqtt_obj.connect(config.mqtt_server, config.mqtt_port, 60) # 连接 self.mqtt_obj.loop_start() self.publish() except BaseException as e: self.printf("mqtt 启动失败:{}".format(e)) self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件 self.printf(e.__traceback__.tb_lineno) time.sleep(1) pass pass """ 打印信息 """ def printf(self, *data): if self.is_debug is True: print(data, time.time()) pass if __name__ == "__main__": mqtt_client().server_main() pass