import pandas as pd
import hs_udata as hs
from MySQLOperation import *
from sqlalchemy import create_engine
from datetime import datetime
import time
import configparser
import logging
import traceback
import warnings
warnings.filterwarnings("ignore")
class TB_Stock_List:
def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name):
# 创建日志
self.logging = logging
self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG
, format='%(asctime)s - %(levelname)s - %(message)s')
# 读取配置文件中,恒有数和数据库参数
configFilePath = MySQL_Config
self.section1 = BD_Name
config = configparser.ConfigParser()
config.read(configFilePath)
# 读取 恒有数(UData) 的 token
self.token = eval(config.get(section=self.section1, option='token'))
# MySQL连接参数读取
self.host = eval(config.get(section=self.section1, option='host'))
self.port = int(config.get(section=self.section1, option='port'))
self.db = eval(config.get(section=self.section1, option='db'))
self.user = eval(config.get(section=self.section1, option='user'))
self.passwd = eval(config.get(section=self.section1, option='passwd'))
# pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据
self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd)
# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql
self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user
, self.passwd
, self.host
, self.port
, self.db))
# 读取配置文件中,恒有数和数据库参数
configFilePath = Table_Config
self.section2 = Table_Name
config = configparser.ConfigParser()
config.read(configFilePath)
self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE'))
self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE'))
self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))
self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))
def CREATE_TABLE(self):
try:
# 删除表结构
self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0')
self.MySQL.Execute_Code(self.DROP_TABLE_SQL)
# 创建表结构
self.MySQL.Execute_Code(self.CREATE_TABLE_SQL)
self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def UPDATE_DATA(self):
try:
# 设置token
hs.set_token(self.token)
time_start = time.time() # 计时
# 获取 股票列表 数据
df = hs.stock_list()
# 在最后一列增加系统时间戳
dt = datetime.now()
df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')
# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据
self.MySQL.Execute_Code(self.DELETE_DATA_SQL)
# 将数据写入到MySQL中的数据表
df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append')
time_end = time.time() # 计时
elapsed_time = round(time_end-time_start,2)
# 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间
self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records
VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2
,dt.strftime('%Y-%m-%d'),len(df),elapsed_time)
self.MySQL.Execute_Code(self.RECORDS_SQL)
self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2))
except:
self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
def READ_DATA(self, WhereCondition=''):
try:
result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2)
+ WhereCondition, con=self.engine)
self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2))
return result
except:
self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2))
self.logging.debug(traceback.format_exc())
return 0
if __name__ == '__main__':
MySQL_Config = 'DB_MySQL.config'
BD_Name = 'udata'
Table_Config = 'DB_Table.config'
Table_Name = 'tb_stock_list'
# 实例化
TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name)
# 创建表结构
TB_Stock_List_Main.CREATE_TABLE()
# 更新数据
TB_Stock_List_Main.UPDATE_DATA()
# 读取数据
data = TB_Stock_List_Main.READ_DATA()
评论