cxfoot/cart_mqtt_sub/mqtt_sub.py
2023-10-24 14:54:18 +08:00

850 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
import binascii
import datetime
import hashlib
import os
import threading
import traceback
from paho.mqtt import client as mqtt
import base64
import json
import struct
import time
import redis
import config
import binascii
class MqttSub():
is_debug = False
redis_obj = None
mqtt_obj = None
# 球车数据键
redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"
redis_name_mqtt_config = "api:cxaibc:mqtt:config"
"""
1.gps超出门店范围
2.gps超出安全范围
3.硬件链路异常
4.低电量告警
5.倾倒告警
"""
redis_name_warn_status = "api:cxaibc:mqtt:warn_status:"
"""
连接redis
"""
def redis_connect(self):
try:
print('链接redis')
self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True)
self.redis_obj.ping()
except redis.ConnectionError as e:
print('重新链接redis')
time.sleep(1)
return self.redis_connect()
pass
def on_connect(self, client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
if rc != 0:
raise Exception(rc_status[rc])
self.printf("mqtt ", rc_status[rc], client.topic_id)
self.redis_connect()
client.subscribe(client.topic_id, qos=0) # 订阅消息
def on_message(self, client, userdata, msg):
try:
self.printf(msg.topic, "原始数据:", msg.payload)
data = msg.topic.split('/')
if len(data) != 2:
return False
try:
if self.redis_obj is None:
self.redis_connect()
res_json = {}
if data[1] == 'Ack':
return False
if data[1] == 'Detail':
res_json = self.analysisDetail(msg.payload, data[0])
elif data[1] == 'Warn':
res_json = self.analysisWarn(msg.payload, data[0])
elif data[1] == 'GPS':
res_json = self.analysisGPS(msg.payload, data[0])
elif data[1] == 'Info':
res_json = self.analysisInfo(msg.payload, data[0])
elif data[1] == 'File':
res_json = self.analysisFile(msg.payload, data[0])
pass
elif data[1] == 'FileResp':
res_json = self.analysisFileResp(msg.payload, data[0])
pass
elif data[1] == 'sendAudio':
res_json = self.analysisSendAudio(msg.payload, data[0])
pass
else:
xy = self.x_o_r(msg.payload)
temp_data = {}
temp_data['orig_data'] = self.toBase64(msg.payload)
if xy is False:
temp_data["error"] = "x_o_r_no_success"
if len(msg.payload) > 4:
try:
temp_data['TIMESTAMP'] = struct.unpack('i', msg.payload[0:4])[0]
except BaseException as e:
temp_data['TIMESTAMP'] = 0
res_json = {'status': False, 'data': json.dumps(temp_data), 'time': time.time()}
self.redis_obj.lpush(config.redis_name_mqtt_sub_list, json.dumps({
'data': res_json,
'time': time.time(),
'id': data[0],
'type': data[1],
}))
if 'status' in res_json.keys() and res_json['status'] is True:
self.redis_obj.set("{}{}_{}".format(self.redis_name_mqtt_qc_data, data[0], data[1]), json.dumps(res_json))
if data[1] == 'Detail' or data[1] == 'GPS':
# 设置过期时间为10分钟
self.redis_obj.expire("{}{}_{}".format(self.redis_name_mqtt_qc_data, data[0], data[1]), 60*5)
self.printf(data, "解析结果:", res_json)
pass
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
self.printf(e)
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
self.redis_obj = None
time.sleep(1)
return self.on_message(client, userdata, msg)
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
self.printf(e)
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
time.sleep(1)
return False
pass
"""
解析Detail数据
"""
def analysisDetail(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
self.printf('数据效验不通过')
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
if len(data) < 66:
arr['error'] = "len_data_no_q_65"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
arr['IMEI'] = data[4:24].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
except BaseException as e:
arr['IMEI'] = ''
try:
arr['SIMID'] = data[24:44].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
except BaseException as e:
arr['SIMID'] = ''
try:
arr['CSQ'] = data[44:45][0]
except BaseException as e:
arr['CSQ'] = 0
try:
arr['OPERATION_ENABLE'] = data[45:46][0]
except BaseException as e:
arr['OPERATION_ENABLE'] = 0
try:
arr['OPERATION_LEVEL'] = data[46:47][0]
except BaseException as e:
arr['OPERATION_LEVEL'] = 0
try:
arr['MODE'] = data[47:48][0]
except BaseException as e:
arr['MODE'] = 0
try:
arr['ELEC_PERCENT'] = data[48:49][0]
except BaseException as e:
arr['ELEC_PERCENT'] = 0
try:
arr['RANGE'] = struct.unpack('>l', data[49:53])[0]
except BaseException as e:
arr['RANGE'] = 0
try:
arr['RANGE_CURRENT'] = struct.unpack('>l', data[53:57])[0]
except BaseException as e:
arr['RANGE_CURRENT'] = 0
try:
arr['RUNTIME'] = struct.unpack('>l', data[57:61])[0]
except BaseException as e:
arr['RUNTIME'] = 0
try:
arr['RUNTIME_CURRENT'] = struct.unpack('>l', data[61:65])[0]
except BaseException as e:
arr['RUNTIME_CURRENT'] = 0
redis_name = "api:cxaibc:ball_cart:{}".format(ball_cart_id)
get = self.redis_obj.get(redis_name)
print(get, 'get-data')
if get is None or get != "{}:{}".format(arr['OPERATION_ENABLE'], arr['OPERATION_LEVEL']):
self.printf("需要去请求球车变更球车信息{}".format(ball_cart_id))
# 需要去请求球车变更球车信息
handle_type = 0
pattern = 0
if get is not None and get != '':
get = get.split(':')
if len(get) >= 2:
handle_type = int(get[0])
pattern = int(get[1])
pass
pass
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
str_res += struct.pack('B', handle_type)
str_res += struct.pack('B', pattern)
str_res += (hashlib.md5("{}+{}".format(str(ball_cart_id).zfill(16), time_base).encode(
"utf8")).hexdigest()).upper().encode()
x_o_y = self.return_x_o_r(str_res)
a2b_hex = hex(x_o_y).encode()[2:]
if len(a2b_hex) == 1:
a2b_hex = b"0"+a2b_hex
str_res += binascii.a2b_hex(a2b_hex)
self.printf("发送数据:", handle_type, pattern, "变更请求数据:", str_res)
self.mqtt_obj.publish("{}/Permission".format(ball_cart_id), str_res)
self.redis_obj.set(redis_name, "{}:{}".format(handle_type, pattern))
except BaseException as e:
traceback.print_exc()
self.printf("发布错误:", e)
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
# str_res += struct.pack('B', 1)
str_res += b"\xD1"
x_o_y = self.return_x_o_r(str_res)
a2b_hex = hex(x_o_y).encode()[2:]
if len(a2b_hex) == 1:
a2b_hex = b"0" + a2b_hex
str_res += binascii.a2b_hex(a2b_hex)
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
traceback.print_exc()
self.printf("发布错误:", e)
pass
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except Exception as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
self.printf("解析失败:", e)
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
"""
解析Warn数据
"""
def analysisWarn(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
if len(data) < 12:
arr['error'] = "len_data_no_q_12"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
arr['WARN'] = struct.unpack('>l', data[4:8])[0]
except BaseException as e:
arr['WARN'] = 0
try:
arr['WARN_HARDWARE'] = data[8:9][0]
except BaseException as e:
arr['WARN_HARDWARE'] = 0
try:
arr['WARN_LOW_POWER'] = data[9:10][0]
except BaseException as e:
arr['WARN_LOW_POWER'] = 0
try:
arr['WARN_FALLDOWN'] = data[10:11][0]
except BaseException as e:
arr['WARN_FALLDOWN'] = 0
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 3), 60, arr['WARN_HARDWARE'])
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 4), 60, arr['WARN_LOW_POWER'])
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 5), 60, arr['WARN_FALLDOWN'])
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
str_res += b"\xD1"
x_o_y = self.return_x_o_r(str_res)
a2b_hex = hex(x_o_y).encode()[2:]
if len(a2b_hex) == 1:
a2b_hex = b"0" + a2b_hex
str_res += binascii.a2b_hex(a2b_hex)
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
print(traceback.print_exc())
self.printf("发布错误:", e)
pass
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
"""
解析GPS数据
"""
def analysisGPS(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
if len(data) < 32:
arr['error'] = "len_data_no_q_32"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
# utc日期
arr['UTC_DATE'] = struct.unpack('>L', data[4:8])[0]
except BaseException as e:
arr['UTC_DATE'] = 0
try:
# utc时间
arr['UTC_TIME'] = struct.unpack('>L', data[8:12])[0]
except BaseException as e:
arr['UTC_TIME'] = 0
try:
# 纬度
arr['LAT'] = struct.unpack('d', data[12:20])[0]
if float(arr['LAT']) > 90 or float(arr['LAT']) < -90:
arr['LAT'] = 0
except BaseException as e:
arr['LAT'] = 0
try:
# 经度
arr['LON'] = struct.unpack('d', data[20:28])[0]
if float(arr['LON']) > 180 or float(arr['LON']) < -180:
arr['LON'] = 0
except BaseException as e:
arr['LON'] = 0
try:
# 海拔
arr['ELEV'] = struct.unpack('>l', data[28:32])[0]
except BaseException as e:
arr['ELEV'] = 0
try:
# 位置编号
arr['LAC'] = struct.unpack('h', data[32:34])[0]
except BaseException as e:
arr['LAC'] = 0
try:
# 小区id
arr['CI'] = struct.unpack('l', data[34:38])[0]
except BaseException as e:
arr['CI'] = 0
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
str_res += b"\xD1"
x_o_y = self.return_x_o_r(str_res)
a2b_hex = hex(x_o_y).encode()[2:]
if len(a2b_hex) == 1:
a2b_hex = b"0" + a2b_hex
str_res += binascii.a2b_hex(a2b_hex)
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
traceback.print_exc()
self.printf("发布错误:", e)
pass
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
"""
解析Info数据 解析车辆信息(硬件信息)
"""
def analysisInfo(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
if len(data) < 32:
arr['error'] = "len_data_no_q_32"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
# utc日期
arr['DEVICE_ADDR'] = struct.unpack('l', data[4:5])[0]
except BaseException as e:
arr['DEVICE_ADDR'] = 0
try:
# utc时间
arr['VERSION_NUM'] = struct.unpack('l', data[5:9])[0]
except BaseException as e:
arr['VERSION_NUM'] = 0
try:
# utc时间
arr['VERSION_DATE'] = struct.unpack('l', data[9:13])[0]
except BaseException as e:
arr['VERSION_DATE'] = 0
try:
arr['IMEI'] = data[13:33].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
except BaseException as e:
print(e)
arr['IMEI'] = ''
try:
arr['SIMID'] = data[33:53].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
except BaseException as e:
arr['SIMID'] = ''
try:
arr['PACK_LEN_MAX'] = struct.unpack('l', data[53:57])[0]
except BaseException as e:
arr['PACK_LEN_MAX'] = 0
try:
time_base = int(time.time())
str_res = struct.pack('i', time_base)
str_res += b"\xD1"
x_o_y = self.return_x_o_r(str_res)
a2b_hex = hex(x_o_y).encode()[2:]
if len(a2b_hex) == 1:
a2b_hex = b"0" + a2b_hex
str_res += binascii.a2b_hex(a2b_hex)
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
traceback.print_exc()
self.printf("发布错误:", e)
pass
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
"""
解析文件信息(音频)
"""
def analysisFile(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
arr['SOURCE_ID'] = struct.unpack('c', data[4:5])[0][0]
except BaseException as e:
arr['SOURCE_ID'] = 0
try:
arr['FILE_TYPE'] = struct.unpack('c', data[5:6])[0][0]
except BaseException as e:
arr['FILE_TYPE'] = 0
try:
arr['FILE_NAME'] = data[6:38].decode('ISO-8859-1').rstrip('0')
except BaseException as e:
arr['FILE_NAME'] = ""
try:
arr['FILE_SIZE'] = struct.unpack('L', data[38:42])[0]
except BaseException as e:
arr['FILE_SIZE'] = ''
try:
arr['FILE_CHECK'] = struct.unpack('L', data[42:46])[0]
except BaseException as e:
arr['FILE_CHECK'] = ''
try:
arr['FILE_PACK_OFFSET'] = struct.unpack('L', data[46:50])[0]
except BaseException as e:
arr['FILE_PACK_OFFSET'] = 0
try:
arr['FILE_PACK_LEN'] = struct.unpack('L', data[50:54])[0]
except BaseException as e:
arr['FILE_PACK_LEN'] = 0
try:
arr['FILE_PACK_CHECK'] = struct.unpack('L', data[54:58])[0]
except BaseException as e:
arr['FILE_PACK_CHECK'] = 0
try:
if data[58:-1] == b'\x00\x00' or data[58:-1] == b'\x00\x00\x00\x00':
arr['FILE_PACK_DATA'] = ""
else:
arr['FILE_PACK_DATA'] = data[58:-1].decode()
except BaseException as e:
arr['FILE_PACK_DATA'] = ""
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
pass
"""
解析文件相应
"""
def analysisFileResp(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
arr['SOURCE_ID'] = struct.unpack('c', data[4:5])[0][0]
except BaseException as e:
arr['SOURCE_ID'] = 0
try:
arr['FILE_TYPE'] = struct.unpack('c', data[5:6])[0][0]
except BaseException as e:
arr['FILE_TYPE'] = 0
try:
arr['FILE_NAME'] = data[6:38].decode('ISO-8859-1').rstrip('0')
except BaseException as e:
arr['FILE_NAME'] = ""
try:
arr['FILE_SIZE'] = struct.unpack('L', data[38:42])[0]
except BaseException as e:
arr['FILE_SIZE'] = ''
try:
arr['FILE_CHECK'] = struct.unpack('L', data[42:46])[0]
except BaseException as e:
arr['FILE_CHECK'] = ''
try:
arr['FILE_PACK_OFFSET'] = struct.unpack('L', data[46:50])[0]
except BaseException as e:
arr['FILE_PACK_OFFSET'] = 0
try:
arr['FILE_PACK_LEN'] = struct.unpack('L', data[50:54])[0]
except BaseException as e:
arr['FILE_PACK_LEN'] = 0
try:
arr['FILE_PACK_CHECK'] = struct.unpack('L', data[54:58])[0]
except BaseException as e:
arr['FILE_PACK_CHECK'] = 0
try:
if data[58:-1] == b'\x00\x00' or data[58:-1] == b'\x00\x00\x00\x00':
arr['FILE_PACK_DATA'] = ""
else:
arr['FILE_PACK_DATA'] = data[58:-1].decode()
except BaseException as e:
arr['FILE_PACK_DATA'] = str(e)
redis_name = "api:cxaibc:mqtt:file_res:{}_{}".format(ball_cart_id, arr['FILE_NAME'])
self.redis_obj.delete(redis_name)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
pass
"""
解析音频内容(后台发送)
"""
def analysisSendAudio(self, data, ball_cart_id):
arr = {}
arr['orig_data'] = self.toBase64(data)
if self.x_o_r(data) is False:
arr['error'] = "x_o_r_no_success"
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
try:
try:
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
except BaseException as e:
arr['TIMESTAMP'] = 0
try:
if data[4:-1] == b'\x00\x00' or data[4:-1] == b'\x00\x00\x00\x00':
arr['DATA'] = ""
else:
arr['DATA'] = data[4:-1].decode()
except BaseException as e:
arr['DATA'] = ""
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
arr['error'] = str(e)
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
pass
pass
"""
执行体
"""
def server_main(self):
while True:
try:
if self.redis_obj is None:
self.redis_connect()
get = self.redis_obj.get(self.redis_name_mqtt_config)
if get is not None:
json_de = json.loads(get)
if json_de is not None:
config.mqtt_server = json_de['mqtt_server']
config.mqtt_port = int(json_de['mqtt_port'])
config.mqtt_user = json_de['mqtt_user']
config.mqtt_pass = json_de['mqtt_pass']
self.is_debug = config.is_debug
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
self.mqtt_obj = mqtt.Client(client_id, transport='tcp')
self.mqtt_obj.username_pw_set(config.mqtt_user, config.mqtt_pass) # mqtt服务器账号密码
self.mqtt_obj.connect(config.mqtt_server, config.mqtt_port, 60) # 连接
self.mqtt_obj.topic_id = "+/+"
self.mqtt_obj.on_connect = self.on_connect # 返回连接状态的回调函数
self.mqtt_obj.on_message = self.on_message # 返回订阅消息的回调函数
self.mqtt_obj.loop_forever() # 以forever方式阻塞运行。
except BaseException as e:
if os.path.exists('./mqtt_error') is False:
os.mkdir('./mqtt_error')
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
self.printf("mqtt 启动失败:{}".format(e))
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
time.sleep(1)
pass
pass
pass
"""
异或效验
"""
def x_o_r(self, data):
temp_end = data[-1:]
data = data[0:-1]
t = None
for i in range(len(data)):
if i:
t ^= data[i]
else:
t = data[i] ^ 0
if int(temp_end.hex(), 16) == t:
return True
return False
"""
获取异或最后值
"""
def return_x_o_r(self, data):
t = None
for i in range(len(data)):
if i:
t ^= data[i]
else:
t = data[i] ^ 0
return t
"""
尝试转成base64编码格式
"""
def toBase64(self, data):
try:
return base64.b64encode(data).decode('ISO-8859-1')
except BaseException as e:
return data.decode('ISO-8859-1')
pass
"""
打印信息
"""
def printf(self, *data):
if self.is_debug is True:
print(data, time.time())
pass
"""
测试数据结果集
"""
def analysisUnPack(self, data,type=1):
keys = [
'x', 'c', 'b', 'B', '?', 'h', 'H', 'i', 'I', 'l', 'L', 'q', 'Q', 'f', 'd', 's', 'p', 'P',
]
two_key = [
'', '<', '=', '>', '!', '@'
]
arr = {}
for v in keys:
for v1 in two_key:
try:
if type == 1:
arr["{}{}".format(v, v1)] = struct.unpack("{}{}".format(v, v1), data)
else:
arr["{}{}".format(v, v1)] = struct.pack("{}{}".format(v, v1), data)
except BaseException as e:
arr["{}{}".format(v, v1)] = "{}: {}".format('error', str(e))
for v in arr:
print(v, arr[v])
return arr
if __name__ == '__main__':
# print(temp_data)
# print(struct.unpack('l',temp_data))
# exit()
# {"orig_data": "fbZxYwEAMzVFMDRBQjE0NTVEMjA4MkVFMzFEMDczNjM0QTk5RTel", "TIMESTAMP": 0}
# temp_data = 'fbZxYwEAMzVFMDRBQjE0NTVEMjA4MkVFMzFEMDczNjM0QTk5RTel'
# temp = base64.b64decode(temp_data)
# temp = b'G\xaeYd\x9ba\x01\x00gV\x00\x00y:\x0bO\xc5\x838@=`\x05\xbf5\x88]@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00`'
# print(temp)
# print(struct.unpack('B',temp[5:6]))
# exit()
# temp = b'\xc1\x0c[d\x00\x00\x11\x00\x00\x1a\x84\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0089860418132180042387\x00\x04\x00\x00{'
# res = MqttSub().analysisInfo(temp,1)
# print(res)
# exit()
MqttSub().server_main()
pass