写点什么

Python 量化数据仓库搭建系列 3:数据落库代码封装

  • 2021 年 11 月 30 日
  • 本文字数:5258 字

    阅读完需:约 17 分钟

本系列教程为量化开发者,提供本地量化金融数据仓库的搭建教程与全套源代码。我们以恒有数(UDATA)金融数据社区为数据源,将金融基础数据落到本地数据库。教程提供全套源代码,包括历史数据下载与增量数据更新,数据更新任务部署与日常监控等操作。


在上一节讲述中,我们封装了 Python 操作 MySQL 数据库的自定义类,存为 MySQLOperation.py 文件;本节内容操作数据库部分,将会调用 MySQLOperation 中的方法,以及 pandas.to_sql 和 pandas.read_sql 的操作。

一、恒有数(UDATA)操作简介

1、获取 Token

A、在恒有数官网(https://udata.hs.net)注册并登录,在订阅页面,下单免费的体验套餐;B、在右上角,头像下拉菜单中,进入总览页面,复制 Token;C、在数据页面,查看数据接口文档,获取接口名称、请求参数、返回参数和 Python 代码示例;

2、安装 hs_udata

pip install hs_udata
复制代码


使用示例如下:


import hs_udata as hs# 设置Tokenhs.set_token(token = 'xxxxxxxxxxxxxxx')  # 总览页面获取个人Token# 以分钟线行情为例,获取000001.SZ在2021-05-01至2021-06-01期间的分钟线数据# 接口文档见:https://udata.hs.net/datas/342/df = hs.stock_quote_minutes(en_prod_code="000001.SZ",begin_date="20210501",end_date="20210601")# 增加股票代码列df['hs_code']='000001.SZ'df.head()
复制代码


3586


其余接口使用过程与之类似;

二、数据落库示例

以股票列表(stock_list)为例,讲解建表、落库、查询等操作;全套代码见本文第三章;

1、准备工作

(1)在 MySQL 数据库中,创建数据库 udata,创建过程见第一讲《Python 量化数据仓库搭建系列 1:数据库安装与操作》;


(2)在 MySQL 数据库中,创建数据更新记录表 udata.tb_update_records,表结构如下:


3587


建表 SQL 如下:


CREATE TABLE udata.tb_update_records (                table_name CHAR(40),                data_date CHAR(20),                update_type CHAR(20),                data_number INT(20),                elapsed_time INT(20),                updatetime CHAR(20)                )
复制代码


(3)将 Token 与数据库参数写入配置文件 DB_MySQL.config,文件内容如下:


[udata]token='你的Token'host='127.0.0.1'port=3306user='root'passwd='密码'db='udata'
复制代码


(4)读取配置文件中的参数


import configparser# 读取配置文件中,恒有数和数据库参数configFilePath = 'DB_MySQL.config'section = 'udata'config = configparser.ConfigParser()config.read(configFilePath)# 读取 恒有数(UData) 的 tokentoken = eval(config.get(section=section, option='token'))# MySQL连接参数读取host = eval(config.get(section=section, option='host'))port = int(config.get(section=section, option='port'))db = eval(config.get(section=section, option='db'))user = eval(config.get(section=section, option='user'))passwd = eval(config.get(section=section, option='passwd'))
复制代码

2、建表

在社区数据页面,查看数据表返回参数字段,以股票列表(stock_list,https://udata.hs.net/datas/202/)为例:


3588


将建表、删除表格、清空数据的 SQL,写入配置文件 DB_Table.config,文件内容如下:


[tb_stock_list]DROP_TABLE='DROP TABLE IF EXISTS tb_stock_list'CREATE_TABLE='''CREATE TABLE tb_stock_list (                secu_code CHAR(20),                hs_code CHAR(20),                secu_abbr CHAR(20),                chi_name CHAR(40),                secu_market CHAR(20),                 listed_state CHAR(20),                listed_sector CHAR(20),                updatetime CHAR(20)                )'''DELETE_DATA = 'truncate table tb_stock_list'
复制代码


建表代码如下:


# MySQL操作实例化,pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据MySQL = MySQLOperation(host, port, db, user, passwd)# 读取配置文件中,恒有数和数据库参数configFilePath = 'DB_Table.config'section = 'tb_stock_list'config = configparser.ConfigParser()config.read(configFilePath)DROP_TABLE = eval(config.get(section=section, option='DROP_TABLE'))CREATE_TABLE = eval(config.get(section=section, option='CREATE_TABLE'))DELETE_DATA = eval(config.get(section=section, option='DELETE_DATA'))# 删除表结构MySQL.Execute_Code(DROP_TABLE)# 创建表结构MySQL.Execute_Code(CREATE_TABLE)
复制代码

3、数据落库

import hs_udata as hsfrom sqlalchemy import create_enginefrom datetime import datetime# 设置tokenhs.set_token(token)# 获取 股票列表 数据df = hs.stock_list()# 在最后一列增加系统时间戳dt = datetime.now()df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S')# 由于股票列表数据为全量更新,数据插入之前,先清空表中数据MySQL.Execute_Code(DELETE_DATA)# sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sqlengine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user,passwd,host,port,db))# 将数据写入到MySQL中的数据表df.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')
复制代码


在数据库中查看结果如下:

4、读取数据

import pandas as pdresult = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)print(result.head())
复制代码

三、落库代码封装

将上述几步操作,封装到一起,定义并调用 python 中 class 类的属性和方法。代码中涉及主要技术点如下:(1)使用 pymysql、pandas.to_sql 和 pandas.read_sql 操作 MySQL 数据库;(2)使用 class 类的方法,集成建表、插入数据和查询数据的操作;(3)使用配置文件的方式,从本地文件中,读取数据库参数与表操作的 SQL 代码;(4)使用 try 容错机制,结

合日志函数,将执行日志打印到本地的 DB_MySQL_LOG.txt 文件;


import pandas as pdimport hs_udata as hsfrom MySQLOperation import *from sqlalchemy import create_enginefrom datetime import datetimeimport timeimport configparserimport loggingimport tracebackimport warningswarnings.filterwarnings("ignore")
class TB_Stock_List:
def __init__(self,MySQL_Config,BD_Name,Table_Config,Table_Name): # 创建日志 self.logging = logging self.logging.basicConfig(filename='DB_MySQL_LOG.txt', level=self.logging.DEBUG , format='%(asctime)s - %(levelname)s - %(message)s') # 读取配置文件中,恒有数和数据库参数 configFilePath = MySQL_Config self.section1 = BD_Name config = configparser.ConfigParser() config.read(configFilePath) # 读取 恒有数(UData) 的 token self.token = eval(config.get(section=self.section1, option='token')) # MySQL连接参数读取 self.host = eval(config.get(section=self.section1, option='host')) self.port = int(config.get(section=self.section1, option='port')) self.db = eval(config.get(section=self.section1, option='db')) self.user = eval(config.get(section=self.section1, option='user')) self.passwd = eval(config.get(section=self.section1, option='passwd')) # pymysql连接设置,可打开、关闭、执行sql、执行sql读取数据 self.MySQL = MySQLOperation(self.host, self.port, self.db, self.user, self.passwd) # sqlalchemy 连接设置,可用于pandas.read_sql、pandas.to_sql self.engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(self.user , self.passwd , self.host , self.port , self.db)) # 读取配置文件中,恒有数和数据库参数 configFilePath = Table_Config self.section2 = Table_Name config = configparser.ConfigParser() config.read(configFilePath) self.DROP_TABLE_SQL = eval(config.get(section=self.section2, option='DROP_TABLE')) self.CREATE_TABLE_SQL = eval(config.get(section=self.section2, option='CREATE_TABLE')) self.DELETE_DATA_SQL = eval(config.get(section=self.section2, option='DELETE_DATA'))
self.logging.info('*********************{0}.{1}*********************'.format(self.section1, self.section2))
def CREATE_TABLE(self): try: # 删除表结构 self.MySQL.Execute_Code('SET FOREIGN_KEY_CHECKS = 0') self.MySQL.Execute_Code(self.DROP_TABLE_SQL) # 创建表结构 self.MySQL.Execute_Code(self.CREATE_TABLE_SQL) self.logging.info('表{0}.{1},表格创建成功'.format(self.section1,self.section2)) except: self.logging.info('表{0}.{1},表格创建失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc())
def UPDATE_DATA(self): try: # 设置token hs.set_token(self.token) time_start = time.time() # 计时 # 获取 股票列表 数据 df = hs.stock_list() # 在最后一列增加系统时间戳 dt = datetime.now() df['updatetime'] = dt.strftime('%Y-%m-%d %H:%M:%S') # 由于股票列表数据为全量更新,数据插入之前,先清空表中数据 self.MySQL.Execute_Code(self.DELETE_DATA_SQL) # 将数据写入到MySQL中的数据表 df.to_sql(name='tb_stock_list', con=self.engine, index=False, if_exists='append') time_end = time.time() # 计时 elapsed_time = round(time_end-time_start,2) # 向mysql库中记录一条数据更新记录:表名,数据日期,更新方式,更新条数,更新耗时,系统时间 self.RECORDS_SQL = '''INSERT INTO udata.tb_update_records VALUES ('{0}','{1}','全量',{2},{3}, SYSDATE())'''.format(self.section2 ,dt.strftime('%Y-%m-%d'),len(df),elapsed_time) self.MySQL.Execute_Code(self.RECORDS_SQL) self.logging.info('表{0}.{1},数据更新成功'.format(self.section1,self.section2)) except: self.logging.info('表{0}.{1},数据更新失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc())
def READ_DATA(self, WhereCondition=''): try: result = pd.read_sql('''SELECT * FROM {0}.{1} '''.format(self.section1,self.section2) + WhereCondition, con=self.engine) self.logging.info('表{0}.{1},数据读取成功'.format(self.section1,self.section2)) return result except: self.logging.info('表{0}.{1},数据读取失败'.format(self.section1,self.section2)) self.logging.debug(traceback.format_exc()) return 0 if __name__ == '__main__': MySQL_Config = 'DB_MySQL.config' BD_Name = 'udata' Table_Config = 'DB_Table.config' Table_Name = 'tb_stock_list' # 实例化 TB_Stock_List_Main = TB_Stock_List(MySQL_Config,BD_Name,Table_Config,Table_Name) # 创建表结构 TB_Stock_List_Main.CREATE_TABLE() # 更新数据 TB_Stock_List_Main.UPDATE_DATA() # 读取数据 data = TB_Stock_List_Main.READ_DATA()
复制代码

四、代码及配置文件下载

(1)附件文件目录:

DB_MySQL.config

DB_Table.config

MySQLOperation.py

TB_Stock_List.py

(2)下载附件文件后,将配置文件中 Token 与密码替换为自己的。

(3)附件请见来源:恒生LIGHT云社区

发布于: 2 小时前阅读数: 7
用户头像

还未添加个人签名 2018.11.07 加入

还未添加个人简介

评论

发布
暂无评论
Python量化数据仓库搭建系列3:数据落库代码封装