import pandas as pdimport hs_udata as hsfrom MySQLOperation import *from sqlalchemy import create_enginefrom datetime import datetimeimport timeimport configparserimport loggingimport tracebackimport warningswarnings.filterwarnings("ignore")
class TB_Stock_List:
def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name): # 创建日志 self.logging = logging self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG , format='%(asctime)s - %(levelname)s - %(message)s') # 读取配置文件中,恒有数和数据库参数 configFilePath = MySQL_Config self.section1 = BD_Name config = configparser.ConfigParser() config.read(configFilePath) # 读取 恒有数(UData) 的 token self.token = eval(config.get(section=self.section1, option='token')) # MySQL连接参数读取 self.host = eval(config.get(section=self.section1, option='host')) self.port = int(config.get(section=self.section1, option='port')) self.db = eval(config.get(section=self.section1, option='db')) self.user = eval(config.get(section=self.section1, option='user')) self.passwd = eval(config.get(section=self.section1, option='passwd')) # pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据 self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd) # sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user , self.passwd , self.host , self.port , self.db)) # 读取配置文件中,恒有数和数据库参数 configFilePath = Table_Config self.section2 = Table_Name config = configparser.ConfigParser() config.read(configFilePath) self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE')) self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE')) self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))
self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))
def CREATE_TABLE(self): try: # 删除表结构 self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0') self.MySQL.Execute_Code(self.DROP_TABLE_SQL) # 创建表结构 self.MySQL.Execute_Code(self.CREATE_TABLE_SQL) self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2)) except: self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc())
def UPDATE_DATA(self): try: # 设置token hs.set_token(self.token) time_start = time.time() # 计时 # 获取 股票列表 数据 df = hs.stock_list() # 在最后一列增加系统时间戳 dt = datetime.now() df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S') # 由于股票列表数据为全量更新,数据插入之前,先清空表中数据 self.MySQL.Execute_Code(self.DELETE_DATA_SQL) # 将数据写入到MySQL中的数据表 df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append') time_end = time.time() # 计时 elapsed_time = round(time_end-time_start,2) # 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间 self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2 ,dt.strftime('%Y-%m-%d'),len(df),elapsed_time) self.MySQL.Execute_Code(self.RECORDS_SQL) self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2)) except: self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc())
def READ_DATA(self, WhereCondition=''): try: result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2) + WhereCondition, con=self.engine) self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2)) return result except: self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc()) return 0 if __name__ == '__main__': MySQL_Config = 'DB_MySQL.config' BD_Name = 'udata' Table_Config = 'DB_Table.config' Table_Name = 'tb_stock_list' # 实例化 TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name) # 创建表结构 TB_Stock_List_Main.CREATE_TABLE() # 更新数据 TB_Stock_List_Main.UPDATE_DATA() # 读取数据 data = TB_Stock_List_Main.READ_DATA()
评论