850 lines
36 KiB
Python
850 lines
36 KiB
Python
# -*- coding:utf-8 -*-
|
||
import binascii
|
||
import datetime
|
||
import hashlib
|
||
import os
|
||
import threading
|
||
import traceback
|
||
|
||
from paho.mqtt import client as mqtt
|
||
import base64
|
||
import json
|
||
import struct
|
||
import time
|
||
import redis
|
||
import config
|
||
import binascii
|
||
|
||
class MqttSub():
|
||
|
||
is_debug = False
|
||
|
||
redis_obj = None
|
||
|
||
mqtt_obj = None
|
||
|
||
# 球车数据键
|
||
redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"
|
||
redis_name_mqtt_config = "api:cxaibc:mqtt:config"
|
||
|
||
"""
|
||
1.gps超出门店范围
|
||
2.gps超出安全范围
|
||
3.硬件链路异常
|
||
4.低电量告警
|
||
5.倾倒告警
|
||
"""
|
||
redis_name_warn_status = "api:cxaibc:mqtt:warn_status:"
|
||
|
||
|
||
"""
|
||
连接redis
|
||
"""
|
||
def redis_connect(self):
|
||
try:
|
||
print('链接redis')
|
||
self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True)
|
||
self.redis_obj.ping()
|
||
except redis.ConnectionError as e:
|
||
print('重新链接redis')
|
||
time.sleep(1)
|
||
return self.redis_connect()
|
||
pass
|
||
|
||
def on_connect(self, client, userdata, flags, rc):
|
||
"""一旦连接成功, 回调此方法"""
|
||
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
|
||
if rc != 0:
|
||
raise Exception(rc_status[rc])
|
||
self.printf("mqtt: ", rc_status[rc], client.topic_id)
|
||
self.redis_connect()
|
||
client.subscribe(client.topic_id, qos=0) # 订阅消息
|
||
|
||
def on_message(self, client, userdata, msg):
|
||
try:
|
||
self.printf(msg.topic, "原始数据:", msg.payload)
|
||
data = msg.topic.split('/')
|
||
if len(data) != 2:
|
||
return False
|
||
try:
|
||
if self.redis_obj is None:
|
||
self.redis_connect()
|
||
res_json = {}
|
||
if data[1] == 'Ack':
|
||
return False
|
||
if data[1] == 'Detail':
|
||
res_json = self.analysisDetail(msg.payload, data[0])
|
||
elif data[1] == 'Warn':
|
||
res_json = self.analysisWarn(msg.payload, data[0])
|
||
elif data[1] == 'GPS':
|
||
res_json = self.analysisGPS(msg.payload, data[0])
|
||
elif data[1] == 'Info':
|
||
res_json = self.analysisInfo(msg.payload, data[0])
|
||
elif data[1] == 'File':
|
||
res_json = self.analysisFile(msg.payload, data[0])
|
||
pass
|
||
elif data[1] == 'FileResp':
|
||
res_json = self.analysisFileResp(msg.payload, data[0])
|
||
pass
|
||
elif data[1] == 'sendAudio':
|
||
res_json = self.analysisSendAudio(msg.payload, data[0])
|
||
pass
|
||
else:
|
||
xy = self.x_o_r(msg.payload)
|
||
temp_data = {}
|
||
temp_data['orig_data'] = self.toBase64(msg.payload)
|
||
if xy is False:
|
||
temp_data["error"] = "x_o_r_no_success"
|
||
if len(msg.payload) > 4:
|
||
try:
|
||
temp_data['TIMESTAMP'] = struct.unpack('i', msg.payload[0:4])[0]
|
||
except BaseException as e:
|
||
temp_data['TIMESTAMP'] = 0
|
||
res_json = {'status': False, 'data': json.dumps(temp_data), 'time': time.time()}
|
||
self.redis_obj.lpush(config.redis_name_mqtt_sub_list, json.dumps({
|
||
'data': res_json,
|
||
'time': time.time(),
|
||
'id': data[0],
|
||
'type': data[1],
|
||
}))
|
||
if 'status' in res_json.keys() and res_json['status'] is True:
|
||
self.redis_obj.set("{}{}_{}".format(self.redis_name_mqtt_qc_data, data[0], data[1]), json.dumps(res_json))
|
||
if data[1] == 'Detail' or data[1] == 'GPS':
|
||
# 设置过期时间为10分钟
|
||
self.redis_obj.expire("{}{}_{}".format(self.redis_name_mqtt_qc_data, data[0], data[1]), 60*5)
|
||
self.printf(data, "解析结果:", res_json)
|
||
pass
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
self.printf(e)
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
self.redis_obj = None
|
||
time.sleep(1)
|
||
return self.on_message(client, userdata, msg)
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
self.printf(e)
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
time.sleep(1)
|
||
return False
|
||
pass
|
||
|
||
"""
|
||
解析Detail数据
|
||
"""
|
||
def analysisDetail(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
self.printf('数据效验不通过')
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
if len(data) < 66:
|
||
arr['error'] = "len_data_no_q_65"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
arr['IMEI'] = data[4:24].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
|
||
except BaseException as e:
|
||
arr['IMEI'] = ''
|
||
try:
|
||
arr['SIMID'] = data[24:44].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
|
||
except BaseException as e:
|
||
arr['SIMID'] = ''
|
||
try:
|
||
arr['CSQ'] = data[44:45][0]
|
||
except BaseException as e:
|
||
arr['CSQ'] = 0
|
||
try:
|
||
arr['OPERATION_ENABLE'] = data[45:46][0]
|
||
except BaseException as e:
|
||
arr['OPERATION_ENABLE'] = 0
|
||
try:
|
||
arr['OPERATION_LEVEL'] = data[46:47][0]
|
||
except BaseException as e:
|
||
arr['OPERATION_LEVEL'] = 0
|
||
try:
|
||
arr['MODE'] = data[47:48][0]
|
||
except BaseException as e:
|
||
arr['MODE'] = 0
|
||
try:
|
||
arr['ELEC_PERCENT'] = data[48:49][0]
|
||
except BaseException as e:
|
||
arr['ELEC_PERCENT'] = 0
|
||
try:
|
||
arr['RANGE'] = struct.unpack('>l', data[49:53])[0]
|
||
except BaseException as e:
|
||
arr['RANGE'] = 0
|
||
try:
|
||
arr['RANGE_CURRENT'] = struct.unpack('>l', data[53:57])[0]
|
||
except BaseException as e:
|
||
arr['RANGE_CURRENT'] = 0
|
||
try:
|
||
arr['RUNTIME'] = struct.unpack('>l', data[57:61])[0]
|
||
except BaseException as e:
|
||
arr['RUNTIME'] = 0
|
||
try:
|
||
arr['RUNTIME_CURRENT'] = struct.unpack('>l', data[61:65])[0]
|
||
except BaseException as e:
|
||
arr['RUNTIME_CURRENT'] = 0
|
||
redis_name = "api:cxaibc:ball_cart:{}".format(ball_cart_id)
|
||
get = self.redis_obj.get(redis_name)
|
||
print(get, 'get-data')
|
||
if get is None or get != "{}:{}".format(arr['OPERATION_ENABLE'], arr['OPERATION_LEVEL']):
|
||
self.printf("需要去请求球车变更球车信息{}".format(ball_cart_id))
|
||
# 需要去请求球车变更球车信息
|
||
handle_type = 0
|
||
pattern = 0
|
||
if get is not None and get != '':
|
||
get = get.split(':')
|
||
if len(get) >= 2:
|
||
handle_type = int(get[0])
|
||
pattern = int(get[1])
|
||
pass
|
||
pass
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
str_res += struct.pack('B', handle_type)
|
||
str_res += struct.pack('B', pattern)
|
||
str_res += (hashlib.md5("{}+{}".format(str(ball_cart_id).zfill(16), time_base).encode(
|
||
"utf8")).hexdigest()).upper().encode()
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
a2b_hex = hex(x_o_y).encode()[2:]
|
||
if len(a2b_hex) == 1:
|
||
a2b_hex = b"0"+a2b_hex
|
||
str_res += binascii.a2b_hex(a2b_hex)
|
||
self.printf("发送数据:", handle_type, pattern, "变更请求数据:", str_res)
|
||
self.mqtt_obj.publish("{}/Permission".format(ball_cart_id), str_res)
|
||
self.redis_obj.set(redis_name, "{}:{}".format(handle_type, pattern))
|
||
except BaseException as e:
|
||
traceback.print_exc()
|
||
self.printf("发布错误:", e)
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
# str_res += struct.pack('B', 1)
|
||
str_res += b"\xD1"
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
a2b_hex = hex(x_o_y).encode()[2:]
|
||
if len(a2b_hex) == 1:
|
||
a2b_hex = b"0" + a2b_hex
|
||
str_res += binascii.a2b_hex(a2b_hex)
|
||
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
traceback.print_exc()
|
||
self.printf("发布错误:", e)
|
||
pass
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except Exception as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
self.printf("解析失败:", e)
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
|
||
"""
|
||
解析Warn数据
|
||
"""
|
||
def analysisWarn(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
if len(data) < 12:
|
||
arr['error'] = "len_data_no_q_12"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
arr['WARN'] = struct.unpack('>l', data[4:8])[0]
|
||
except BaseException as e:
|
||
arr['WARN'] = 0
|
||
try:
|
||
arr['WARN_HARDWARE'] = data[8:9][0]
|
||
except BaseException as e:
|
||
arr['WARN_HARDWARE'] = 0
|
||
try:
|
||
arr['WARN_LOW_POWER'] = data[9:10][0]
|
||
except BaseException as e:
|
||
arr['WARN_LOW_POWER'] = 0
|
||
try:
|
||
arr['WARN_FALLDOWN'] = data[10:11][0]
|
||
except BaseException as e:
|
||
arr['WARN_FALLDOWN'] = 0
|
||
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 3), 60, arr['WARN_HARDWARE'])
|
||
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 4), 60, arr['WARN_LOW_POWER'])
|
||
self.redis_obj.setex("{}{}".format(self.redis_name_warn_status, 5), 60, arr['WARN_FALLDOWN'])
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
str_res += b"\xD1"
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
a2b_hex = hex(x_o_y).encode()[2:]
|
||
if len(a2b_hex) == 1:
|
||
a2b_hex = b"0" + a2b_hex
|
||
str_res += binascii.a2b_hex(a2b_hex)
|
||
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
print(traceback.print_exc())
|
||
self.printf("发布错误:", e)
|
||
pass
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
|
||
"""
|
||
解析GPS数据
|
||
"""
|
||
def analysisGPS(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
if len(data) < 32:
|
||
arr['error'] = "len_data_no_q_32"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
# utc日期
|
||
arr['UTC_DATE'] = struct.unpack('>L', data[4:8])[0]
|
||
except BaseException as e:
|
||
arr['UTC_DATE'] = 0
|
||
try:
|
||
# utc时间
|
||
arr['UTC_TIME'] = struct.unpack('>L', data[8:12])[0]
|
||
except BaseException as e:
|
||
arr['UTC_TIME'] = 0
|
||
try:
|
||
# 纬度
|
||
arr['LAT'] = struct.unpack('d', data[12:20])[0]
|
||
if float(arr['LAT']) > 90 or float(arr['LAT']) < -90:
|
||
arr['LAT'] = 0
|
||
except BaseException as e:
|
||
arr['LAT'] = 0
|
||
try:
|
||
# 经度
|
||
arr['LON'] = struct.unpack('d', data[20:28])[0]
|
||
if float(arr['LON']) > 180 or float(arr['LON']) < -180:
|
||
arr['LON'] = 0
|
||
except BaseException as e:
|
||
arr['LON'] = 0
|
||
try:
|
||
# 海拔
|
||
arr['ELEV'] = struct.unpack('>l', data[28:32])[0]
|
||
except BaseException as e:
|
||
arr['ELEV'] = 0
|
||
try:
|
||
# 位置编号
|
||
arr['LAC'] = struct.unpack('h', data[32:34])[0]
|
||
except BaseException as e:
|
||
arr['LAC'] = 0
|
||
try:
|
||
# 小区id
|
||
arr['CI'] = struct.unpack('l', data[34:38])[0]
|
||
except BaseException as e:
|
||
arr['CI'] = 0
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
str_res += b"\xD1"
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
a2b_hex = hex(x_o_y).encode()[2:]
|
||
if len(a2b_hex) == 1:
|
||
a2b_hex = b"0" + a2b_hex
|
||
str_res += binascii.a2b_hex(a2b_hex)
|
||
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
traceback.print_exc()
|
||
self.printf("发布错误:", e)
|
||
pass
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
|
||
"""
|
||
解析Info数据 解析车辆信息(硬件信息)
|
||
"""
|
||
def analysisInfo(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
if len(data) < 32:
|
||
arr['error'] = "len_data_no_q_32"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
# utc日期
|
||
arr['DEVICE_ADDR'] = struct.unpack('l', data[4:5])[0]
|
||
except BaseException as e:
|
||
arr['DEVICE_ADDR'] = 0
|
||
try:
|
||
# utc时间
|
||
arr['VERSION_NUM'] = struct.unpack('l', data[5:9])[0]
|
||
except BaseException as e:
|
||
arr['VERSION_NUM'] = 0
|
||
try:
|
||
# utc时间
|
||
arr['VERSION_DATE'] = struct.unpack('l', data[9:13])[0]
|
||
except BaseException as e:
|
||
arr['VERSION_DATE'] = 0
|
||
try:
|
||
arr['IMEI'] = data[13:33].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
|
||
except BaseException as e:
|
||
print(e)
|
||
arr['IMEI'] = ''
|
||
try:
|
||
arr['SIMID'] = data[33:53].decode('ISO-8859-1').strip(b'\x00'.decode('ISO-8859-1'))
|
||
except BaseException as e:
|
||
arr['SIMID'] = ''
|
||
try:
|
||
arr['PACK_LEN_MAX'] = struct.unpack('l', data[53:57])[0]
|
||
except BaseException as e:
|
||
arr['PACK_LEN_MAX'] = 0
|
||
try:
|
||
time_base = int(time.time())
|
||
str_res = struct.pack('i', time_base)
|
||
str_res += b"\xD1"
|
||
x_o_y = self.return_x_o_r(str_res)
|
||
a2b_hex = hex(x_o_y).encode()[2:]
|
||
if len(a2b_hex) == 1:
|
||
a2b_hex = b"0" + a2b_hex
|
||
str_res += binascii.a2b_hex(a2b_hex)
|
||
self.mqtt_obj.publish("{}/Ack".format(ball_cart_id), str_res)
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
traceback.print_exc()
|
||
self.printf("发布错误:", e)
|
||
pass
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
|
||
"""
|
||
解析文件信息(音频)
|
||
"""
|
||
def analysisFile(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
arr['SOURCE_ID'] = struct.unpack('c', data[4:5])[0][0]
|
||
except BaseException as e:
|
||
arr['SOURCE_ID'] = 0
|
||
try:
|
||
arr['FILE_TYPE'] = struct.unpack('c', data[5:6])[0][0]
|
||
except BaseException as e:
|
||
arr['FILE_TYPE'] = 0
|
||
try:
|
||
arr['FILE_NAME'] = data[6:38].decode('ISO-8859-1').rstrip('0')
|
||
except BaseException as e:
|
||
arr['FILE_NAME'] = ""
|
||
try:
|
||
arr['FILE_SIZE'] = struct.unpack('L', data[38:42])[0]
|
||
except BaseException as e:
|
||
arr['FILE_SIZE'] = ''
|
||
try:
|
||
arr['FILE_CHECK'] = struct.unpack('L', data[42:46])[0]
|
||
except BaseException as e:
|
||
arr['FILE_CHECK'] = ''
|
||
try:
|
||
arr['FILE_PACK_OFFSET'] = struct.unpack('L', data[46:50])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_OFFSET'] = 0
|
||
try:
|
||
arr['FILE_PACK_LEN'] = struct.unpack('L', data[50:54])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_LEN'] = 0
|
||
try:
|
||
arr['FILE_PACK_CHECK'] = struct.unpack('L', data[54:58])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_CHECK'] = 0
|
||
try:
|
||
if data[58:-1] == b'\x00\x00' or data[58:-1] == b'\x00\x00\x00\x00':
|
||
arr['FILE_PACK_DATA'] = ""
|
||
else:
|
||
arr['FILE_PACK_DATA'] = data[58:-1].decode()
|
||
except BaseException as e:
|
||
arr['FILE_PACK_DATA'] = ""
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
解析文件相应
|
||
"""
|
||
def analysisFileResp(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
arr['SOURCE_ID'] = struct.unpack('c', data[4:5])[0][0]
|
||
except BaseException as e:
|
||
arr['SOURCE_ID'] = 0
|
||
try:
|
||
arr['FILE_TYPE'] = struct.unpack('c', data[5:6])[0][0]
|
||
except BaseException as e:
|
||
arr['FILE_TYPE'] = 0
|
||
try:
|
||
arr['FILE_NAME'] = data[6:38].decode('ISO-8859-1').rstrip('0')
|
||
except BaseException as e:
|
||
arr['FILE_NAME'] = ""
|
||
try:
|
||
arr['FILE_SIZE'] = struct.unpack('L', data[38:42])[0]
|
||
except BaseException as e:
|
||
arr['FILE_SIZE'] = ''
|
||
try:
|
||
arr['FILE_CHECK'] = struct.unpack('L', data[42:46])[0]
|
||
except BaseException as e:
|
||
arr['FILE_CHECK'] = ''
|
||
try:
|
||
arr['FILE_PACK_OFFSET'] = struct.unpack('L', data[46:50])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_OFFSET'] = 0
|
||
try:
|
||
arr['FILE_PACK_LEN'] = struct.unpack('L', data[50:54])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_LEN'] = 0
|
||
try:
|
||
arr['FILE_PACK_CHECK'] = struct.unpack('L', data[54:58])[0]
|
||
except BaseException as e:
|
||
arr['FILE_PACK_CHECK'] = 0
|
||
try:
|
||
if data[58:-1] == b'\x00\x00' or data[58:-1] == b'\x00\x00\x00\x00':
|
||
arr['FILE_PACK_DATA'] = ""
|
||
else:
|
||
arr['FILE_PACK_DATA'] = data[58:-1].decode()
|
||
except BaseException as e:
|
||
arr['FILE_PACK_DATA'] = str(e)
|
||
redis_name = "api:cxaibc:mqtt:file_res:{}_{}".format(ball_cart_id, arr['FILE_NAME'])
|
||
self.redis_obj.delete(redis_name)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
解析音频内容(后台发送)
|
||
"""
|
||
def analysisSendAudio(self, data, ball_cart_id):
|
||
arr = {}
|
||
arr['orig_data'] = self.toBase64(data)
|
||
if self.x_o_r(data) is False:
|
||
arr['error'] = "x_o_r_no_success"
|
||
return {'status': False, 'data': json.dumps(arr), 'time': time.time()}
|
||
try:
|
||
try:
|
||
arr['TIMESTAMP'] = struct.unpack('i', data[0:4])[0]
|
||
except BaseException as e:
|
||
arr['TIMESTAMP'] = 0
|
||
try:
|
||
if data[4:-1] == b'\x00\x00' or data[4:-1] == b'\x00\x00\x00\x00':
|
||
arr['DATA'] = ""
|
||
else:
|
||
arr['DATA'] = data[4:-1].decode()
|
||
except BaseException as e:
|
||
arr['DATA'] = ""
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
arr['error'] = str(e)
|
||
return {'status': True, 'data': json.dumps(arr), 'time': time.time()}
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
执行体
|
||
"""
|
||
def server_main(self):
|
||
while True:
|
||
try:
|
||
if self.redis_obj is None:
|
||
self.redis_connect()
|
||
get = self.redis_obj.get(self.redis_name_mqtt_config)
|
||
if get is not None:
|
||
json_de = json.loads(get)
|
||
if json_de is not None:
|
||
config.mqtt_server = json_de['mqtt_server']
|
||
config.mqtt_port = int(json_de['mqtt_port'])
|
||
config.mqtt_user = json_de['mqtt_user']
|
||
config.mqtt_pass = json_de['mqtt_pass']
|
||
self.is_debug = config.is_debug
|
||
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
|
||
self.mqtt_obj = mqtt.Client(client_id, transport='tcp')
|
||
self.mqtt_obj.username_pw_set(config.mqtt_user, config.mqtt_pass) # mqtt服务器账号密码
|
||
self.mqtt_obj.connect(config.mqtt_server, config.mqtt_port, 60) # 连接
|
||
|
||
self.mqtt_obj.topic_id = "+/+"
|
||
self.mqtt_obj.on_connect = self.on_connect # 返回连接状态的回调函数
|
||
self.mqtt_obj.on_message = self.on_message # 返回订阅消息的回调函数
|
||
self.mqtt_obj.loop_forever() # 以forever方式阻塞运行。
|
||
except BaseException as e:
|
||
if os.path.exists('./mqtt_error') is False:
|
||
os.mkdir('./mqtt_error')
|
||
path_name = './mqtt_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/mqtt_error.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
self.printf("mqtt 启动失败:{}".format(e))
|
||
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
||
self.printf(e.__traceback__.tb_lineno)
|
||
time.sleep(1)
|
||
pass
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
异或效验
|
||
"""
|
||
def x_o_r(self, data):
|
||
temp_end = data[-1:]
|
||
data = data[0:-1]
|
||
t = None
|
||
for i in range(len(data)):
|
||
if i:
|
||
t ^= data[i]
|
||
else:
|
||
t = data[i] ^ 0
|
||
if int(temp_end.hex(), 16) == t:
|
||
return True
|
||
return False
|
||
|
||
"""
|
||
获取异或最后值
|
||
"""
|
||
def return_x_o_r(self, data):
|
||
t = None
|
||
for i in range(len(data)):
|
||
if i:
|
||
t ^= data[i]
|
||
else:
|
||
t = data[i] ^ 0
|
||
return t
|
||
|
||
"""
|
||
尝试转成base64编码格式
|
||
"""
|
||
def toBase64(self, data):
|
||
try:
|
||
return base64.b64encode(data).decode('ISO-8859-1')
|
||
except BaseException as e:
|
||
return data.decode('ISO-8859-1')
|
||
pass
|
||
|
||
"""
|
||
打印信息
|
||
"""
|
||
def printf(self, *data):
|
||
if self.is_debug is True:
|
||
print(data, time.time())
|
||
pass
|
||
|
||
"""
|
||
测试数据结果集
|
||
"""
|
||
def analysisUnPack(self, data,type=1):
|
||
keys = [
|
||
'x', 'c', 'b', 'B', '?', 'h', 'H', 'i', 'I', 'l', 'L', 'q', 'Q', 'f', 'd', 's', 'p', 'P',
|
||
]
|
||
two_key = [
|
||
'', '<', '=', '>', '!', '@'
|
||
]
|
||
arr = {}
|
||
for v in keys:
|
||
for v1 in two_key:
|
||
try:
|
||
if type == 1:
|
||
arr["{}{}".format(v, v1)] = struct.unpack("{}{}".format(v, v1), data)
|
||
else:
|
||
arr["{}{}".format(v, v1)] = struct.pack("{}{}".format(v, v1), data)
|
||
except BaseException as e:
|
||
arr["{}{}".format(v, v1)] = "{}: {}".format('error', str(e))
|
||
for v in arr:
|
||
print(v, arr[v])
|
||
return arr
|
||
|
||
if __name__ == '__main__':
|
||
# print(temp_data)
|
||
# print(struct.unpack('l',temp_data))
|
||
# exit()
|
||
# {"orig_data": "fbZxYwEAMzVFMDRBQjE0NTVEMjA4MkVFMzFEMDczNjM0QTk5RTel", "TIMESTAMP": 0}
|
||
# temp_data = 'fbZxYwEAMzVFMDRBQjE0NTVEMjA4MkVFMzFEMDczNjM0QTk5RTel'
|
||
# temp = base64.b64decode(temp_data)
|
||
# temp = b'G\xaeYd\x9ba\x01\x00gV\x00\x00y:\x0bO\xc5\x838@=`\x05\xbf5\x88]@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00`'
|
||
# print(temp)
|
||
# print(struct.unpack('B',temp[5:6]))
|
||
# exit()
|
||
# temp = b'\xc1\x0c[d\x00\x00\x11\x00\x00\x1a\x84\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0089860418132180042387\x00\x04\x00\x00{'
|
||
# res = MqttSub().analysisInfo(temp,1)
|
||
# print(res)
|
||
# exit()
|
||
|
||
MqttSub().server_main()
|
||
pass
|
||
|
||
|
||
|