742 lines
38 KiB
Python
742 lines
38 KiB
Python
# -*- coding:utf-8 -*-
|
||
"""
|
||
距离计算
|
||
"""
|
||
import copy
|
||
import datetime
|
||
import json
|
||
import os.path
|
||
import threading
|
||
import time
|
||
import traceback
|
||
|
||
import redis
|
||
import config_cart
|
||
from mysql_obj import mysql_obj
|
||
|
||
class CartDistance:
|
||
redis_obj = None # redis链接
|
||
store_id = 0 # 门店id
|
||
region_id = 0 # 区域id
|
||
number = None # 球车编号
|
||
redis_name_mqtt_qc_data = "api:cxaibc:mqtt:data:"
|
||
red_daolu = 50 # n米内都要泛红
|
||
warn_daolu = 100 # n米内都要泛黄
|
||
out_time = 30 # 车辆超时时间60秒
|
||
version = None
|
||
mysql_config = {
|
||
'host': config_cart.mysql_host,
|
||
'port': config_cart.mysql_port,
|
||
'user': config_cart.mysql_user,
|
||
'password': config_cart.mysql_pass,
|
||
'db': config_cart.mysql_db,
|
||
'charset': config_cart.mysql_charset,
|
||
'maxconnections': 70, # 连接池最大连接数量
|
||
}
|
||
console_send = config_cart.console_send
|
||
console_send_audio = config_cart.console_send_audio
|
||
db = None
|
||
|
||
def __init__(self, number=None, version=None):
|
||
if version is None:
|
||
self.version = time.time()
|
||
else:
|
||
self.version = version
|
||
self.number = number
|
||
self.redis_connect()
|
||
pass
|
||
|
||
"""
|
||
连接redis
|
||
"""
|
||
def redis_connect(self):
|
||
try:
|
||
# print('链接redis')
|
||
self.redis_obj = redis.Redis(host=config_cart.redis_server, port=config_cart.redis_port, db=config_cart.redis_database,
|
||
decode_responses=True)
|
||
self.redis_obj.ping()
|
||
except redis.ConnectionError as e:
|
||
print('重新链接redis')
|
||
time.sleep(1)
|
||
if os.path.exists('./distance_error') is False:
|
||
os.mkdir('./distance_error')
|
||
path_name = './distance_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/distance.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
return self.redis_connect()
|
||
pass
|
||
|
||
"""
|
||
发送语音
|
||
"""
|
||
def sendAudio(self, number, audio_id):
|
||
try:
|
||
t = threading.Thread(target=os.system, args=("{} {},{}".format(self.console_send_audio, number, audio_id),))
|
||
t.start()
|
||
except threading.ThreadError as e:
|
||
print(e)
|
||
pass
|
||
|
||
"""
|
||
执行体
|
||
"""
|
||
def run(self):
|
||
if self.number is None:
|
||
return False
|
||
redis_lock_name = 'cxaibcbi:python:ball_number:{}_lock_{}'.format(self.number, self.version)
|
||
setnx = self.redis_obj.setnx(redis_lock_name, 1)
|
||
if setnx is False:
|
||
ttl = self.redis_obj.ttl(redis_lock_name)
|
||
if ttl == -1:
|
||
self.redis_obj.expire(redis_lock_name, 60 * 5)
|
||
print('还未执行完毕:{}:{}:{}'.format(self.number, self.version, ttl))
|
||
return False
|
||
# 获取当前车辆在哪个区域里执行
|
||
temp_data = []
|
||
with mysql_obj(self.mysql_config) as db:
|
||
self.db = db
|
||
sql = """
|
||
select a.store_id,a.region_id from cx_allot as a
|
||
inner join cx_allot_meta as am on a.id = am.allot_id
|
||
inner join cx_ball_cart as b on b.id = am.object_id
|
||
where a.`status` = 1 and am.type = 1 and a.is_delete = 0 and am.is_delete = 0 and
|
||
b.ball_number = '{}' and b.is_delete = 0
|
||
""".format(self.number)
|
||
self.db.cursor.execute(sql)
|
||
res = self.db.cursor.fetchall()
|
||
if len(res) != 0:
|
||
temp_data = res
|
||
pass
|
||
if len(temp_data) == 0:
|
||
self.db.conn.rollback()
|
||
self.redis_obj.delete(redis_lock_name)
|
||
return False
|
||
self.store_id = temp_data[0]['store_id']
|
||
self.region_id = temp_data[0]['region_id']
|
||
try:
|
||
print('执行车辆计算', self.number)
|
||
# 球道数据加入geo
|
||
redis_geo_qiudao_temp_name = "cxaibcbi:python:geo:qiudao_data:{}_{}".format(self.store_id, self.region_id)
|
||
# 球车数据加入geo
|
||
redis_geo_cart_temp_name = "cxaibcbi:python:geo:cart_data:{}_{}_{}".format(self.store_id, self.number, self.region_id)
|
||
# 车辆当前在此球道上的状态
|
||
redis_cart_number_temp_name = "cxaibcbi:python:geo:cart_number:{}_{}".format(self.store_id, self.region_id)
|
||
self.redis_obj.delete(redis_geo_qiudao_temp_name)
|
||
# 球道数据加入geo
|
||
qiudao_res = self.setQiudaoData(redis_geo_qiudao_temp_name)
|
||
if qiudao_res is False:
|
||
self.redis_obj.delete(redis_lock_name)
|
||
print('数据出错')
|
||
self.db.conn.rollback()
|
||
return False
|
||
# 车辆数据加入geo
|
||
qiudao_index = list(set(qiudao_res)).pop()
|
||
res = self.setAllotData(redis_geo_cart_temp_name, redis_cart_number_temp_name, qiudao_index, redis_geo_qiudao_temp_name)
|
||
# 计算出当前车辆范围内是否有车
|
||
number_status = {}
|
||
red_arr = [] # 泛红道路
|
||
warn_arr = [] # 泛黄道路
|
||
for v in res['data']:
|
||
get = self.redis_obj.get("{}_{}".format(redis_cart_number_temp_name, v))
|
||
# 获取车辆状态
|
||
if get is None:
|
||
continue
|
||
json_de = json.loads(get)
|
||
# 判定当前车辆在本点位是否超时
|
||
# 获取最近点位
|
||
one_lat = self.redis_obj.geopos(redis_geo_cart_temp_name, v)
|
||
one_all = self.redis_obj.georadius(redis_geo_qiudao_temp_name, one_lat[0][0], one_lat[0][1], 1000, 'km', False, False, False, None, 'ASC')
|
||
if len(one_all) == 0 or len(one_lat) == 0:
|
||
continue
|
||
all_user_number = int(res['user_arr'][res['key'][v]]) # 人数
|
||
user_min_number = int(qiudao_res[int(one_all[0])]['time']) # 每人可停留分钟
|
||
# gps_data = self.redis_obj.get("{}{}_{}".format(self.redis_name_mqtt_qc_data, self.number, "Detail"))
|
||
# print(gps_data)
|
||
# exit()
|
||
|
||
# print("人员数量:", all_user_number)
|
||
# print("每人停留分钟:", user_min_number)
|
||
# print('可停留时长:', all_user_number*user_min_number)
|
||
times = time.time() - json_de['time']
|
||
if v not in number_status.keys():
|
||
number_status[v] = {
|
||
'status': 0,
|
||
'msg': '',
|
||
'index': one_all[0],
|
||
'type': 0,
|
||
} # 拥堵情况,0.绿色,1.黄色,2.红色
|
||
if times > (all_user_number*user_min_number*60):
|
||
number_status[v] = {
|
||
'status': 1,
|
||
'msg': '车辆停留超时',
|
||
'index': one_all[0],
|
||
'type': 1,
|
||
} # 拥堵情况,0.绿色,1.黄色,2.红色
|
||
# print('停留超时,上报黄灯')
|
||
# continue
|
||
# 计算范围内的车辆
|
||
cart_data = self.redis_obj.georadiusbymember(redis_geo_cart_temp_name, v, 200, 'm')
|
||
add_cart = res['val'][res['key'][v]]
|
||
add_cart.append(v)
|
||
diff_data = list((set(cart_data)).difference((set(add_cart))))
|
||
# if 'status' in qiudao_res[int(one_all[0])].keys() and qiudao_res[int(one_all[0])]['status'] == 2:
|
||
# # print("范围内没有车辆,展示绿灯")
|
||
# # 车辆正在进入红色区域
|
||
# number_status[v] = {
|
||
# 'status': 1,
|
||
# 'msg': '车辆正要进入拥堵区',
|
||
# 'index': one_all[0],
|
||
# }
|
||
if len(diff_data) == 0:
|
||
# print("范围内没有车辆,展示绿灯")
|
||
# 范围内没有车辆,展示绿灯
|
||
if number_status[v]['status'] != 1:
|
||
continue
|
||
number_status[v] = {
|
||
'status': 0,
|
||
'msg': '',
|
||
'index': one_all[0],
|
||
'type': 0,
|
||
}
|
||
continue
|
||
cart_num = 0
|
||
for v1 in diff_data:
|
||
# 判断当前车辆是否在本车点位之前或相同,如果大于,则跳过,不进行计算
|
||
# 获取当前车辆最近点位
|
||
v1_lat = self.redis_obj.geopos(redis_geo_cart_temp_name, v1)
|
||
if len(v1_lat) == 0:
|
||
continue
|
||
v1_all = self.redis_obj.georadius(redis_geo_qiudao_temp_name, v1_lat[0][0], v1_lat[0][1], 1000, 'km', False, False, False, None, 'ASC')
|
||
if v1_all is None:
|
||
continue
|
||
if len(v1_lat) == 0:
|
||
# 本辆车大于判断车辆的点位
|
||
continue
|
||
if int(one_all[0]) < int(v1_all[0]):
|
||
continue
|
||
cart_num += 1
|
||
# 获取当前车辆到这辆车真实距离
|
||
distance = self.getTrueDistance(v, v1, redis_geo_qiudao_temp_name, redis_geo_cart_temp_name)
|
||
if distance is False:
|
||
continue
|
||
# 两车距离10米
|
||
if distance < 20:
|
||
# 判断当前车辆是否有超过1分钟等待时间
|
||
get = self.redis_obj.get("{}_{}".format(redis_cart_number_temp_name, v1))
|
||
# 获取车辆状态
|
||
if get is None:
|
||
continue
|
||
json_v1_de = json.loads(get)
|
||
if time.time() - json_v1_de['time'] > self.out_time and number_status[v]['status'] == 1:
|
||
number_status[v] = {
|
||
'status': 2,
|
||
'msg': '车辆停留超时',
|
||
'index': one_all[0],
|
||
'type': 1,
|
||
}
|
||
# 线路预警
|
||
# 获取当前点位
|
||
# 红道路
|
||
red_daolu = copy.deepcopy(self.red_daolu) # n米内都要泛红
|
||
warn_daolu = copy.deepcopy(self.warn_daolu) # n米内都要泛黄
|
||
red_arr.append(one_all[0])
|
||
t = int(one_all[0])
|
||
while True:
|
||
if int(one_all[0]) == 0:
|
||
# 当前点位为最开始点位
|
||
break
|
||
if int(one_all[0]) == 1:
|
||
# 当前点位为最开始点位
|
||
red_arr.append(int(one_all[0]) - 1)
|
||
break
|
||
if t-1 == -1:
|
||
break
|
||
red_arr.append(t)
|
||
# 计算下一个点位距离现在几米
|
||
next_dian = self.redis_obj.geodist(redis_geo_qiudao_temp_name, t, t-1, 'm')
|
||
red_daolu -= next_dian
|
||
if red_daolu <= 0:
|
||
break
|
||
t -= 1
|
||
# 黄道路
|
||
warn_arr.append(one_all[0])
|
||
t = int(one_all[0])
|
||
while True:
|
||
if int(one_all[0]) == 0:
|
||
# 当前点位为最开始点位
|
||
break
|
||
if int(one_all[0]) == 1:
|
||
# 当前点位为最开始点位
|
||
warn_arr.append(int(one_all[0]) - 1)
|
||
break
|
||
if t-1 == -1:
|
||
break
|
||
warn_arr.append(t)
|
||
# 计算下一个点位距离现在几米
|
||
next_dian = self.redis_obj.geodist(redis_geo_qiudao_temp_name, t, t-1, 'm')
|
||
warn_daolu -= next_dian
|
||
if warn_daolu <= 0:
|
||
break
|
||
t -= 1
|
||
print('前车停留超时,后车停留也超时,前车上报红灯')
|
||
continue
|
||
if time.time() - json_v1_de['time'] > self.out_time and number_status[v]['status'] == 0:
|
||
number_status[v1] = {
|
||
'status': 1,
|
||
'msg': '后车过快行驶',
|
||
'index': v1_all[0],
|
||
'type': 2,
|
||
}
|
||
print('前车停留未超时,后车停留超时,后车上报黄灯')
|
||
continue
|
||
if time.time() - json_v1_de['time'] < self.out_time and number_status[v]['status'] == 1:
|
||
if number_status[v]['status'] != 2:
|
||
number_status[v] = {
|
||
'status': 1,
|
||
'msg': '车辆停留超时',
|
||
'index': one_all[0],
|
||
'type': 1,
|
||
}
|
||
print('前车停留超时,后车停留未超时,前车上报黄灯')
|
||
continue
|
||
pass
|
||
if time.time() - json_v1_de['time'] < self.out_time and number_status[v]['status'] == 0:
|
||
if number_status[v]['status'] != 0:
|
||
number_status[v] = {
|
||
'status': 0,
|
||
'msg': '',
|
||
'index': one_all[0],
|
||
'type': 0,
|
||
}
|
||
print('前车停留超时,后车停留未超时,前车上报绿灯')
|
||
continue
|
||
pass
|
||
# 距离相近,报红灯
|
||
# 判断当前车辆是否停很久
|
||
pass
|
||
# 当前车辆停留时长超限
|
||
# 当前车辆之前都没有车辆
|
||
# 属于正常车辆,变更为正常
|
||
if cart_num == 0 and number_status[v]['status'] == 2:
|
||
print('当前车辆停留时长超限,属于正常车辆')
|
||
number_status[v] = {
|
||
'status': 0,
|
||
'msg': '',
|
||
'index': one_all[0],
|
||
'type': 0,
|
||
}
|
||
if len(red_arr) != 0:
|
||
for v in number_status:
|
||
if number_status[v]['status'] == 0 and int(number_status[v]['index']) in red_arr:
|
||
number_status[v]['status'] = 1
|
||
number_status[v]['msg'] = "车辆进入拥堵区"
|
||
number_status[v]['type'] = 3
|
||
pass
|
||
pass
|
||
pass
|
||
# 入库操作
|
||
# 设置爆红数据
|
||
redis_red_name = "cxaibcbi:python:luduan:{}".format(self.region_id)
|
||
json_en = {
|
||
'red': red_arr,
|
||
'warn': warn_arr,
|
||
}
|
||
self.redis_obj.setex(redis_red_name, 60*60*24, json.dumps(json_en))
|
||
print(number_status)
|
||
# 更新车辆数据
|
||
for v in number_status:
|
||
print(v)
|
||
get = self.redis_obj.get("{}_{}".format(redis_cart_number_temp_name, v))
|
||
# 获取车辆状态
|
||
if get is None:
|
||
json_de = number_status[v]
|
||
else:
|
||
json_de = json.loads(get)
|
||
json_de['qiudao_index'] = number_status[v]['index']
|
||
# 获取球道对应哪个球洞
|
||
json_de['qiudong_index'] = qiudao_res[int(json_de['qiudao_index'])]['bind_index']
|
||
json_de['status'] = number_status[v]['status']
|
||
json_de['msg'] = number_status[v]['msg']
|
||
json_de['type'] = number_status[v]['type']
|
||
json_de['res_time'] = int(time.time())
|
||
json_de['region_id'] = self.region_id
|
||
if 'skip_time' in json_de.keys():
|
||
if json_de['skip_time'] - time.time() > 0:
|
||
# 跳过报错,等待跳过时长
|
||
json_de['status'] = 0
|
||
json_de['type'] = 0
|
||
pass
|
||
self.redis_obj.set("{}_{}".format(redis_cart_number_temp_name, v), json.dumps(json_de))
|
||
if 'old_status' in json_de.keys():
|
||
# 有旧数据
|
||
if int(json_de['old_status']) == int(json_de['status']):
|
||
# 状态相同不入库
|
||
continue
|
||
pass
|
||
pass
|
||
json_de['old_status'] = json_de['status']
|
||
self.redis_obj.set("{}_{}".format(redis_cart_number_temp_name, v), json.dumps(json_de))
|
||
date = datetime.datetime.now().strftime('%Y_%m_%d')
|
||
redis_congestion_name = "cxaibcbi:python:cart_congestion_hash:{}_{}".format(self.store_id, date)
|
||
congestion_info = json.dumps(json_de)
|
||
# 结束所有数据
|
||
sql = """
|
||
update cx_allot_error set end_time = {},event_time = ({} - created_at) ,event_status = 2
|
||
where id =(
|
||
select * from(
|
||
select id from cx_allot_error where cart_id =(
|
||
select id from cx_ball_cart where ball_number = '{}' and is_delete = 0 and `status`=1 order by id desc limit 1
|
||
) limit 1
|
||
) as res
|
||
);
|
||
""".format(int(time.time()), int(time.time()), v)
|
||
self.redis_obj.lpush(config_cart.redis_name_mysql_sub_list, sql)
|
||
if int(json_de['status']) == 0:
|
||
# 没有事件,把事件下架
|
||
self.redis_obj.hdel(redis_congestion_name, v)
|
||
try:
|
||
t = threading.Thread(target=os.system, args=("{} {},{}".format(self.console_send, v, 1), ))
|
||
t.start()
|
||
except threading.ThreadError as e:
|
||
print(e)
|
||
# os.system("{} {},{}".format(self.console_send, v, 1))
|
||
pass
|
||
if int(json_de['status']) == 1:
|
||
# 黄灯,把事件更新最新事件
|
||
self.redis_obj.hset(redis_congestion_name, v, congestion_info)
|
||
# 创建黄灯事件
|
||
sql = """
|
||
insert ignore into cx_allot_error(`type`,`desc`,`cart_id`,`store_id`,`region_id`,`allot_id`,`created_at`,`is_delete`,`deleted_at`,`end_time`,`event_time`,`event_status`,`data`)
|
||
(
|
||
select '{}' as type,'{}' as `desc`,b.object_id as cart_id,a.store_id,a.region_id,a.id,{} as created_at,
|
||
0 as is_delete,0 as deleted_at,0 as end_time,0 as event_time,1 as event_status,'{}' as `data`
|
||
from cx_allot as a inner join cx_allot_meta as b on a.id = b.allot_id
|
||
where b.object_id = (
|
||
select id from cx_ball_cart where ball_number = '{}' and is_delete = 0 and `status` = 1 order by id desc limit 1
|
||
) and b.type = 1 and a.`status` = 1 and a.is_delete = 0 and b.is_delete = 0
|
||
);
|
||
""".format(number_status[v]['type'], number_status[v]['msg'], int(time.time()), congestion_info, v)
|
||
self.redis_obj.lpush(config_cart.redis_name_mysql_sub_list, sql)
|
||
# 加入报警次数
|
||
get = self.redis_obj.hget("cxaibcbi:api:getBallInfo:{}_{}".format(self.store_id,datetime.date.today().strftime('%Y_%m_%d')), 'warning_num')
|
||
if get is None:
|
||
get = 0
|
||
self.redis_obj.hset("cxaibcbi:api:getBallInfo:{}_{}".format(self.store_id,datetime.date.today().strftime('%Y_%m_%d')), 'warning_num', int(get)+1)
|
||
try:
|
||
t = threading.Thread(target=os.system, args=("{} {},{}".format(self.console_send, v, 2), ))
|
||
t.start()
|
||
except threading.ThreadError as e:
|
||
print(e)
|
||
pass
|
||
if int(json_de['status']) == 2:
|
||
# 红灯,把事件更新最新事件
|
||
self.redis_obj.hset(redis_congestion_name, v, congestion_info)
|
||
sql = """
|
||
insert ignore into cx_allot_error(`type`,`desc`,`cart_id`,`store_id`,`region_id`,`allot_id`,`created_at`,`is_delete`,`deleted_at`,`end_time`,`event_time`,`event_status`,`data`)
|
||
(
|
||
select '{}' as type,'{}' as `desc`,b.object_id as cart_id,a.store_id,a.region_id,a.id,{} as created_at,
|
||
0 as is_delete,0 as deleted_at,0 as end_time,0 as event_time,1 as event_status ,'{}' as `data`
|
||
from cx_allot as a inner join cx_allot_meta as b on a.id = b.allot_id
|
||
where b.object_id = (
|
||
select id from cx_ball_cart where ball_number = '{}' and is_delete = 0 and `status` = 1 order by id desc limit 1
|
||
) and b.type = 1 and a.`status` = 1 and a.is_delete = 0 and b.is_delete = 0
|
||
);
|
||
""".format(number_status[v]['type'], number_status[v]['msg'], int(time.time()), congestion_info, v)
|
||
self.redis_obj.lpush(config_cart.redis_name_mysql_sub_list, sql)
|
||
# 加入报警次数
|
||
get = self.redis_obj.hget("cxaibcbi:api:getBallInfo:{}_{}".format(self.store_id, datetime.date.today().strftime('%Y_%m_%d')), 'red_num')
|
||
if get is None:
|
||
get = 0
|
||
self.redis_obj.hset("cxaibcbi:api:getBallInfo:{}_{}".format(self.store_id, datetime.date.today().strftime('%Y_%m_%d')), 'red_num', int(get) + 1)
|
||
try:
|
||
t = threading.Thread(target=os.system, args=("{} {},{}".format(self.console_send, v, 3),))
|
||
t.start()
|
||
except threading.ThreadError as e:
|
||
print(e)
|
||
pass
|
||
pass
|
||
print('结束执行')
|
||
self.redis_obj.delete(redis_lock_name)
|
||
except redis.ConnectionError as e:
|
||
if os.path.exists('./distance_error') is False:
|
||
os.mkdir('./distance_error')
|
||
path_name = './distance_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/distance.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
self.redis_connect()
|
||
self.redis_obj.delete(redis_lock_name)
|
||
return self.run()
|
||
except BaseException as e:
|
||
self.redis_obj.delete(redis_lock_name)
|
||
if os.path.exists('./distance_error') is False:
|
||
os.mkdir('./distance_error')
|
||
path_name = './distance_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
|
||
if os.path.exists(path_name) is False:
|
||
os.mkdir(path_name)
|
||
file_obj = open('{}/distance.txt'.format(path_name), 'a+')
|
||
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
|
||
traceback.print_exc(file=file_obj)
|
||
file_obj.close()
|
||
pass
|
||
pass
|
||
pass
|
||
|
||
"""
|
||
获取两点真实距离
|
||
"""
|
||
def getTrueDistance(self, one_number, two_number, qiudao_redis_name, cart_redis_name):
|
||
# 获取车辆经纬度
|
||
get_geo_dist = self.redis_obj.geodist(cart_redis_name, one_number, two_number)
|
||
if get_geo_dist <= 0:
|
||
# 两点距离为0
|
||
return 0
|
||
one_lat = self.redis_obj.geopos(cart_redis_name, one_number)
|
||
two_lat = self.redis_obj.geopos(cart_redis_name, two_number)
|
||
# 获取a点位中距离的所有点位
|
||
one_all = self.redis_obj.georadius(qiudao_redis_name, one_lat[0][0], one_lat[0][1], get_geo_dist, 'm', True, False, False, None, 'ASC')
|
||
if len(one_all) == 0:
|
||
return get_geo_dist
|
||
two_all = self.redis_obj.georadius(qiudao_redis_name, two_lat[0][0], two_lat[0][1], get_geo_dist, 'm', True, False, False, None, 'ASC')
|
||
if len(two_all) == 0:
|
||
return get_geo_dist
|
||
# 合集
|
||
one_arr = []
|
||
two_arr = []
|
||
for v in one_all:
|
||
one_arr.append(v[0])
|
||
for v in two_all:
|
||
two_arr.append(v[0])
|
||
# 获取交集
|
||
intersection = (list(set(one_arr).intersection(set(two_arr))))
|
||
# 获取第一个点位距离真实点的距离
|
||
distance = 0
|
||
# 第一个点
|
||
one_num = one_all[0][0]
|
||
# 第二个点
|
||
two_num = two_all[0][0]
|
||
if one_num == two_num:
|
||
# 两点相同,之间返回当前点位
|
||
return get_geo_dist
|
||
if int(one_num) > int(two_num):
|
||
max = int(one_num)
|
||
min = int(two_num)
|
||
else:
|
||
max = int(two_num)
|
||
min = int(one_num)
|
||
if max-min == 1:
|
||
return get_geo_dist
|
||
for v in range(max-min-1):
|
||
# 获取当前到下个点位的距离
|
||
geodist = self.redis_obj.geodist(qiudao_redis_name, min+v, min+v+1)
|
||
if geodist is not None:
|
||
distance += geodist
|
||
# 如果在交集里面,则减去
|
||
# 如果在交集外面,则加上
|
||
if one_num in intersection:
|
||
distance -= one_all[0][1]
|
||
else:
|
||
distance += one_all[0][1]
|
||
if two_num in intersection:
|
||
distance -= two_all[0][1]
|
||
else:
|
||
distance += two_all[0][1]
|
||
return distance
|
||
|
||
"""
|
||
设置球道数据
|
||
"""
|
||
def setQiudaoData(self, redis_name):
|
||
temp_data = []
|
||
sql = """
|
||
select * from cx_store_region where id = {}
|
||
""".format(self.region_id)
|
||
self.db.cursor.execute(sql)
|
||
res = self.db.cursor.fetchall()
|
||
if len(res) != 0:
|
||
temp_data = res
|
||
pass
|
||
if len(temp_data) == 0:
|
||
return False
|
||
json_ = json.loads(temp_data[0]['data'])
|
||
res = {}
|
||
# 加入geo
|
||
temp_i = 0
|
||
temp_i_ = 0
|
||
temp_arr = {}
|
||
for v in json_['qiudong_arr']:
|
||
temp_arr[temp_i_] = {'time': v['time'], 'bind_index': temp_i_}
|
||
temp_i_ += 1
|
||
for v in json_['qiudao_arr']:
|
||
coords = (str(v['lng']), str(v['lat']), temp_i)
|
||
self.redis_obj.geoadd(redis_name, coords)
|
||
if v['bind_index'] == '':
|
||
v['bind_index'] = 0
|
||
res[temp_i] = temp_arr[int(v['bind_index'])]
|
||
temp_i += 1
|
||
return res
|
||
|
||
"""
|
||
获取在使用车辆
|
||
"""
|
||
def setAllotData(self, redis_name, redis_cart_number_temp_name, end_index, redis_geo_qiudao_temp_name):
|
||
self.redis_obj.delete(redis_name)
|
||
temp_data = []
|
||
sql = """
|
||
select a.id,b.allot_id,b.type,
|
||
if(type=1,(select ball_number from cx_ball_cart as bc where bc.id = b.object_id),'') as ball_number
|
||
from cx_allot as a
|
||
inner join cx_allot_meta as b on a.id = b.allot_id
|
||
where a.is_delete = 0 and b.is_delete = 0 and a.status = 1 and a.store_id = {} and a.region_id = {}
|
||
""".format(self.store_id, self.region_id)
|
||
self.db.cursor.execute(sql)
|
||
res = self.db.cursor.fetchall()
|
||
if len(res) != 0:
|
||
temp_data = res
|
||
pass
|
||
|
||
# 加入geo
|
||
temp_cart = {}
|
||
temp_key_cart = {}
|
||
temp_cart_arr = []
|
||
temp_redis_lat_lng = "cxaibcbi:python:cart_debug"
|
||
temp_latlng_redis = self.redis_obj.get(temp_redis_lat_lng)
|
||
temp_latlng = {}
|
||
if temp_latlng_redis is not None:
|
||
# 模拟经纬度
|
||
json_de = json.loads(temp_latlng_redis)
|
||
for v in json_de:
|
||
temp_latlng[v['number']] = {
|
||
'lon': v['lng'],
|
||
'lat': v['lat'],
|
||
}
|
||
user_arr = {}
|
||
for v in temp_data:
|
||
if v['type'] == 1:
|
||
if v['id'] not in user_arr.keys():
|
||
user_arr[v['id']] = 0
|
||
user_arr[v['id']] += 1
|
||
if v['type'] != 1:
|
||
continue
|
||
print("运行车辆:",v['ball_number'])
|
||
# 获取到车辆信息
|
||
if v['ball_number'] in temp_latlng.keys():
|
||
# 如果后台设置有数据,则获取
|
||
lat = temp_latlng[v['ball_number']]['lat']
|
||
lng = temp_latlng[v['ball_number']]['lon']
|
||
coords = (str(lng), str(lat), v['ball_number'])
|
||
else:
|
||
gps_data = self.redis_obj.get("{}{}_{}".format(self.redis_name_mqtt_qc_data, v['ball_number'], "GPS"))
|
||
if gps_data is None:
|
||
# 跳过数据
|
||
continue
|
||
else:
|
||
gps_data = json.loads(json.loads(gps_data)['data'])
|
||
lat = gps_data['LAT']
|
||
lng = gps_data['LON']
|
||
coords = (str(lng), str(lat), v['ball_number'])
|
||
self.redis_obj.geoadd(redis_name, coords)
|
||
# 同一组数据加入
|
||
if v['id'] not in temp_cart.keys():
|
||
temp_cart[v['id']] = []
|
||
temp_cart[v['id']].append(v['ball_number'])
|
||
temp_key_cart[v['ball_number']] = v['id']
|
||
temp_cart_arr.append(v['ball_number'])
|
||
get = self.redis_obj.get("{}_{}".format(redis_cart_number_temp_name, v['ball_number']))
|
||
if get is None:
|
||
# 创建初始数据
|
||
redis_data = {
|
||
'lat': lat,
|
||
'lng': lng,
|
||
'time': time.time(),
|
||
'deng_status': 0, #车灯状态,0.正常,1.预警,2.报警
|
||
'qiudao_index': None,# 最近球道id
|
||
}
|
||
print('创建初始数据', v['ball_number'])
|
||
else:
|
||
redis_data = json.loads(get)
|
||
if 'dian_time' not in redis_data.keys():
|
||
redis_data['dian_time'] = redis_data['time']
|
||
if int(time.time()) - int(redis_data['dian_time']) > 60 and 'qiudong_index' in redis_data.keys():
|
||
# 可以增加点的时间
|
||
temp_redis_name = 'cxaibcbi:python:geo:cart_number:dong_index_{}:{}_{}_{}'.format(datetime.date.today().strftime('%Y_%m_%d'), self.store_id, self.region_id, redis_data['qiudong_index'])
|
||
res = self.redis_obj.incrby(temp_redis_name, int(time.time()) - int(redis_data['dian_time']))
|
||
redis_data['dian_time'] = int(time.time())
|
||
|
||
self.redis_obj.setex("{}_{}".format(redis_cart_number_temp_name, v['ball_number']), 60*5, json.dumps(redis_data))
|
||
# 判断当前车辆是否有移动过
|
||
if redis_data['lat'] != lat or redis_data['lng'] != lng:
|
||
# 计算移动距离,
|
||
temp_redis_name = 'cxaibcbi:python:geo:cart_number:temp:{}_{}_{}'.format(self.store_id, self.region_id, v['ball_number'])
|
||
self.redis_obj.delete(temp_redis_name)
|
||
one_geo_data = (str(redis_data['lng']), str(redis_data['lat']), 'one')
|
||
two_geo_data = (str(lng), str(lat), 'two')
|
||
self.redis_obj.geoadd(temp_redis_name, one_geo_data)
|
||
self.redis_obj.geoadd(temp_redis_name, two_geo_data)
|
||
# 获取两点距离
|
||
geodist = self.redis_obj.geodist(temp_redis_name, 'one', 'two')
|
||
if geodist is not None and geodist > 15:
|
||
# 移动
|
||
redis_data['lat'] = lat
|
||
redis_data['lng'] = lng
|
||
redis_data['time'] = time.time()
|
||
# redis_data['dian_time'] = int(time.time())
|
||
pass
|
||
else:
|
||
# 暂未移动
|
||
pass
|
||
# 如果最近球洞为None,则获取最近id
|
||
# if 'qiudao_index' not in redis_data.keys() or redis_data['qiudao_index'] is None:
|
||
# redis_data['qiudao_index'] = 0
|
||
# # 获取最近点位
|
||
# one_all = self.redis_obj.georadius(redis_geo_qiudao_temp_name, redis_data['lng'], redis_data['lat'], 1000, 'km', False, False, False, None, 'ASC')
|
||
# # 获取点位绑定球洞
|
||
# if len(one_all) == 0:
|
||
# redis_data['qiudao_index'] = 0
|
||
# else:
|
||
# redis_data['qiudao_index'] = one_all[0]
|
||
# pass
|
||
self.redis_obj.setex("{}_{}".format(redis_cart_number_temp_name, v['ball_number']), 60*60*24, json.dumps(redis_data))
|
||
# 返回当前车辆按最后一个点位由近到远数据
|
||
# 获取经纬度
|
||
geopos = self.redis_obj.geopos(redis_geo_qiudao_temp_name, end_index)
|
||
if geopos is not None and len(geopos) != 0:
|
||
georadius = self.redis_obj.georadius(redis_name, geopos[0][0], geopos[0][1], 100000, 'km', False, False, False, None, 'ASC')
|
||
if georadius is None:
|
||
temp_cart_arr = []
|
||
else:
|
||
temp_cart_arr = georadius
|
||
pass
|
||
# 当前车辆信息存储
|
||
redis_cart_all_name = "cxaibcbi:python:cart:data:{}_{}".format(self.store_id, self.region_id)
|
||
self.redis_obj.setex(redis_cart_all_name, 30, json.dumps(temp_cart_arr))
|
||
return {
|
||
'key': temp_key_cart,
|
||
'val': temp_cart,
|
||
'data': temp_cart_arr,
|
||
'user_arr': user_arr,
|
||
}
|
||
|
||
if __name__ == '__main__':
|
||
while True:
|
||
with mysql_obj(CartDistance().mysql_config) as db:
|
||
sql = """
|
||
select ball_number from cx_ball_cart where is_delete = 0
|
||
"""
|
||
db.cursor.execute(sql)
|
||
res = db.cursor.fetchall()
|
||
db.conn.commit()
|
||
if len(res) != 0:
|
||
for v in res:
|
||
CartDistance(v['ball_number']).run()
|
||
print(v)
|
||
pass
|
||
time.sleep(5)
|