cxfoot/cart_mqtt_sub/mqtt_client.py
2023-10-24 14:54:18 +08:00

177 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
import binascii
import hashlib
from paho.mqtt import client as mqtt
import base64
import json
import struct
import time
import redis
from paho.mqtt.client import ssl
import config
from mysql_obj import mysql_obj
# mqtt客户端
class mqtt_client:
is_debug = False
redis_obj = None
mqtt_obj = None
redis_name_mqtt_config = "api:cxaibc:mqtt:config"
# 球车数据键
redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"
def on_connect(self, client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
if rc != 0:
raise Exception(rc_status[rc])
self.printf("mqtt ", rc_status[rc], client.topic_id)
self.mqtt_obj.publish(client.topic_id, client.send_data)
self.printf("发送成功")
pass
"""
连接redis
"""
def redis_connect(self):
try:
print('链接redis')
self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True)
self.redis_obj.ping()
except redis.ConnectionError as e:
print('重新链接redis')
time.sleep(1)
return self.redis_connect()
pass
"""
发布
"""
def publish(self):
while True:
self.sendDetail("33333")
time.sleep(11)
continue
print("3333")
temp_data = None
with mysql_obj() as db:
sql = """
select ball_number from cx_ball_cart
"""
db.cursor.execute(sql)
res = db.cursor.fetchall()
if len(res) != 0:
temp_data = res
if temp_data is None:
time.sleep(60)
continue
try:
for val in temp_data:
get = self.redis_obj.get("{}{}_{}".format(self.redis_name_mqtt_qc_data, val['ball_number'], "Detail"))
if get is None:
self.sendDetail(val['ball_number'])
continue
json_de = json.loads(get)
if json_de['time'] <= time.time() - 60 * 3:
self.sendDetail(val['ball_number'])
continue
pass
except BaseException as e:
self.printf("执行数据失败:{}".format(e))
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
pass
# 获取redis中detail的数据
time.sleep(60*3)
pass
"""
发送detail数据
"""
def sendDetail(self, ball_cart_id):
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
str_res += struct.pack('i', 1)
x_o_y = self.return_x_o_r(str_res)
str_res += binascii.a2b_hex(hex(x_o_y).encode()[2:])
self.mqtt_obj.publish("{}/Query".format(ball_cart_id), str_res)
print('ssss')
except BaseException as e:
self.printf("发送detail数据失败{}".format(e))
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
pass
pass
"""
获取异或最后值
"""
def return_x_o_r(self, data):
t = None
for i in range(len(data)):
if i:
t ^= data[i]
else:
t = data[i] ^ 0
return t
"""
执行体
"""
def server_main(self):
while True:
try:
# if self.redis_obj is None:
# self.redis_connect()
# get = self.redis_obj.get(self.redis_name_mqtt_config)
# if get is not None:
# json_de = json.loads(get)
# if json_de is not None:
# config.mqtt_server = json_de['mqtt_server']
# config.mqtt_port = int(json_de['mqtt_port'])
# config.mqtt_user = json_de['mqtt_user']
# config.mqtt_pass = json_de['mqtt_pass']
config.mqtt_server = "mqtts.heclouds.com"
config.mqtt_port = 8883
config.mqtt_user = "570670"
config.mqtt_pass = "version=2018-10-31&res=products%2F570670%2Fdevices%2Ft001&et=1675424266&method=sha1&sign=%2B%2FCnauc2Pr73KHu1gUtMfJ2VgVg%3D"
self.is_debug = config.is_debug
client_id = "test"
self.mqtt_obj = mqtt.Client(client_id, transport='tcp')
self.mqtt_obj.username_pw_set(config.mqtt_user, config.mqtt_pass) # mqtt服务器账号密码
# self.mqtt_obj.tls_set(r"C:\Users\Administrator\Downloads\MQTTS-certificate.pem", tls_version=ssl.PROTOCOL_TLSv1_2)
# self.mqtt_obj.tls_insecure_set(True)
self.mqtt_obj.connect(config.mqtt_server, config.mqtt_port, 60) # 连接
self.mqtt_obj.loop_start()
self.publish()
except BaseException as e:
self.printf("mqtt 启动失败:{}".format(e))
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
time.sleep(1)
pass
pass
"""
打印信息
"""
def printf(self, *data):
if self.is_debug is True:
print(data, time.time())
pass
if __name__ == "__main__":
mqtt_client().server_main()
pass