cxfoot/cart_mqtt_sub/mysql_sub.py
2023-10-24 14:54:18 +08:00

95 lines
3.3 KiB
Python

# -*- coding:utf-8 -*-
import datetime
import os
import time
import traceback
import redis
import config
import config_cart
from mysql_obj import mysql_obj
class Mysql():
mysql_config = {
'host': config_cart.mysql_host,
'port': config_cart.mysql_port,
'user': config_cart.mysql_user,
'password': config_cart.mysql_pass,
'db': config_cart.mysql_db,
'charset': config_cart.mysql_charset,
'maxconnections': 70, # 连接池最大连接数量
}
"""
连接redis
"""
def redis_connect(self):
try:
print('链接redis')
self.redis_obj = redis.Redis(host=config.redis_server, port=config.redis_port, db=config.redis_database, decode_responses=True)
self.redis_obj.ping()
except redis.ConnectionError as e:
if os.path.exists('./redis_error') is False:
os.mkdir('./redis_error')
path_name = './redis_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/redis_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
print('重新链接redis')
time.sleep(1)
return self.redis_connect()
pass
def run(self):
self.version = time.time()
while True:
try:
self.is_debug = config.is_debug
self.redis_connect()
while True:
data = self.redis_obj.rpop(config_cart.redis_name_mysql_sub_list)
if data is None:
time.sleep(1)
continue
self.printf(data)
# 执行sql
with mysql_obj(self.mysql_config) as db:
db.cursor.execute(data)
db.conn.commit()
pass
time.sleep(1)
except BaseException as e:
if os.path.exists('./mysql_error') is False:
os.mkdir('./mysql_error')
path_name = './mysql_error/{}'.format(datetime.date.today().strftime('%Y_%m_%d'))
if os.path.exists(path_name) is False:
os.mkdir(path_name)
file_obj = open('{}/mysql_error.txt'.format(path_name), 'a+')
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), file=file_obj)
traceback.print_exc(file=file_obj)
file_obj.close()
self.printf(e)
self.printf(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
self.printf(e.__traceback__.tb_lineno)
time.sleep(1)
pass
pass
"""
打印信息
"""
def printf(self, *data):
print(data, time.time())
return False
if self.is_debug is True:
print(data, time.time())
pass
if __name__ == '__main__':
Mysql().run()