写点什么

Python 量化数据仓库搭建系列 2:Python 操作数据库

  • 2021 年 11 月 29 日
  • 本文字数:4521 字

    阅读完需:约 15 分钟

本系列教程为量化开发者,提供本地量化金融数据仓库的搭建教程与全套源代码。我们以恒有数(UDATA)金融数据社区为数据源,将金融基础数据落到本地数据库。教程提供全套源代码,包括历史数据下载与增量数据更新,数据更新任务部署与日常监控等操作。


在上一节讲述中,我们选择了 MySQL 作为本系列教程的数据库,故本文着重讲解 Python 操作 MySQL 的步骤,并封装方法。在文末简单介绍 Python 操作 MongoDB、SQLite、PostgreSQL 数据库;

一、pymysql 用法

1、安装 pymysql 模块

pip install pymysql
复制代码

2、连接数据库

from pymysql import * # 打开数据库连接,数据库参数可以在MySQL界面或数据库配置文件中查看conn = pymysql.connect(host = '数据库IP',                       port = '端口',                       user = '用户名',                       password = '密码',                       database='数据库名称')
# 使用 cursor() 方法创建一个游标对象 cursorcursor = conn.cursor()
# 在数据库操作执行完毕后,关闭数据库连接# conn.close()
复制代码

3、常见 SQL 代码执行

from pymysql import * # 执行SQL代码:建表、删表、插入数据def Execute_Code(sql_str):    # 打开数据库连接    conn = pymysql.connect(host = '127.0.0.1',port = 3306,user = 'root',                           password = '密码',database='udata')    # 使用 cursor() 方法创建一个游标对象 cursor    cursor = conn.cursor()    try:        # 使用execute()方法执行SQL        cursor.execute(sql)        # 提交到数据库执行        conn.commit()    except:        # 发生错误时回滚        conn.rollback()    # 关闭数据库连接    conn.close() 
复制代码


A、建表


sql_str = '''CREATE TABLE TB_Stock_List_Test (                        secu_code CHAR(20),                        hs_code CHAR(20),                        secu_abbr CHAR(20),                        chi_name CHAR(40),                        secu_market CHAR(20),                         listed_state CHAR(20),                        listed_sector CHAR(20),                        updatetime CHAR(20));'''Execute_Code(sql_str)
复制代码


B、插入数据


sql_str = '''INSERT INTO TB_Stock_List_Test(`secu_code`,`hs_code`,`secu_abbr`,`chi_name`,`secu_market`,`listed_state`,`listed_sector`,`updatetime`)VALUES('000001','000001.SZ','平安银行','平安银行股份有限公司','深圳证券交易所','上市','主板','2021-10-25 20:10:55');'''Execute_Code(sql_str)
复制代码


C、更新数据


sql_str = "UPDATE tb_stock_list SET updatetime = '2021-10-30 20:10:55' "Execute_Code(sql_str)
复制代码


D、删除数据


sql_str = 'DELETE FROM tb_stock_list'Execute_Code(sql_str)
复制代码


E、删除表格


sql_str = 'DROP TABLE IF EXISTS tb_stock_list'Execute_Code(sql_str)
复制代码

4、查询操作

def Select_Code(sql_str):    # 打开数据库连接  conn = pymysql.connect(host = '127.0.0.1',port = 3306,user = 'root',                           password = '密码',database='udata')    # 使用 cursor() 方法创建一个游标对象 cursor    cursor = conn.cursor()    # 使用execute()方法执行SQL    cursor.execute(sql_str)    # 获取所有记录列表    results = cursor.fetchall()    # 关闭数据库连接    conn.close()      return results
复制代码


sql_str = 'select * from tb_stock_list'results = Select_Code(sql_str)results
复制代码

5、方法封装

将上述用法,封装为自定义类,存为 MySQLOperation.py 文件,代码如下:


from pymysql import * # MySQL操作函数class MySQLOperation:    def __init__(self, host, port, db, user, passwd, charset='utf8'):        # 参数初始化        self.host = host        self.port = port        self.db = db        self.user = user        self.passwd = passwd        self.charset = charset    def open(self):        # 打开数据库连接        self.conn = connect(host=self.host,port=self.port                            ,user=self.user,passwd=self.passwd                            ,db=self.db,charset=self.charset)        # 使用 cursor() 方法创建一个游标对象 cursor        self.cursor = self.conn.cursor()    def close(self):        # 断开数据库连接        self.cursor.close()        self.conn.close()    def Execute_Code(self, sql):        # 执行SQL代码:建表、删表、插入数据        try:            self.open()               # 打开数据库连接            self.cursor.execute(sql)  # 使用execute()方法执行SQL            self.conn.commit()        # 提交到数据库执行             self.close()              # 断开数据库连接        except Exception as e:            self.conn.rollback()      # 发生错误时回滚            self.close()              # 断开数据库连接            print(e)    def Select_Code(self, sql):        # 执行SQL代码,查询数据        try:            self.open()                        # 打开数据库连接            self.cursor.execute(sql)           # 使用execute()方法执行SQL            result = self.cursor.fetchall()    # 获取所有记录列表            self.close()                       # 断开数据库连接            return result                      # 返回查询数据        except Exception as e:            self.conn.rollback()               # 发生错误时回滚            self.close()                       # 断开数据库连接            print(e)
复制代码


插入与查询用法如下,其余用法类似,此处不再赘述;


import pandas as pdhost='127.0.0.1'port=3306user='root'passwd="xxxx"db='udata'# 方法实例化MySQL = MySQLOperation(host, port, db, user, passwd)# 插入操作代码sql_str = '''INSERT INTO tb_stock_list(`secu_code`,`hs_code`,`secu_abbr`,`chi_name`,`secu_market`,`listed_state`,`listed_sector`,`updatetime`)VALUES('000001','000001.SZ','平安银行','平安银行股份有限公司','深圳证券交易所','上市','主板','2021-10-25 20:15:55');'''MySQL.Execute_Code(sql_str)# 查询数据sql_str = 'select * from tb_stock_list'results = MySQL.Select_Code(sql_str)results
复制代码

二、sqlalchemy 用法

由于上述 pymysql 用法已经可以满足大部分使用需求,sqlalchemy 实现功能与之类似。这里着重介绍一下基于 sqlalchemy 链接数据库的 pandas.to_sql 和 pandas.read_sql 操作。

1、安装 pymysql 模块

pip install sqlalchemy
复制代码

2、连接数据库

from sqlalchemy import create_enginehost='127.0.0.1'port = 3306user='root'password='密码'database='udata'engine = create_engine('mysql://{0}:{1}@{2}:{3}/{4}?charset=utf8'.format(user                                                                         ,password                                                                         ,host                                                                         ,port                                                                         ,database))
复制代码

3、pandas.to_sql

将 DataFrame 中的数据,写入 MySQL 数据库,代码示例如下:


import pandas as pd# 定义需要写入的数据,DataFrame格式data = pd.DataFrame([['000001','000001.SZ','平安银行','平安银行股份有限公司'                      ,'深圳证券交易所','上市','主板','2021-10-25 20:12:55'],                   ['000002','000002.SZ','万 科A','万科企业股份有限公司'                    ,'深圳证券交易所','上市','主板','2021-10-25 20:12:55']])# 列名赋值data.columns = ['secu_code','hs_code', 'secu_abbr', 'chi_name'                , 'secu_market', 'listed_state','listed_sector','updatetime']# 写入数据库data.to_sql(name='tb_stock_list', con=engine, index=False, if_exists='append')
复制代码


if_exists** 参数用于当目标表已经存在时的处理方式,默认是 fail,即目标表存在就失败。另外两个选项是 replace 表示替代原表,即删除再创建,append 选项仅添加数据。**

4、pandas.read_sql

从数据库中,将数据读取为 DataFrame,代码示例如下:


# 将sql查询结果,赋值为resultresult = pd.read_sql('''SELECT * FROM tb_stock_list ''', con=engine)result
复制代码

三、Python 操作其他常见数据库

1、MongoDB

(1)安装 pymongo:pip install pymongo


(2)操作简介


import pymongo# 连接MongoDBconn = pymongo.MongoClient(host='localhost',port=27017                           ,username='username', password='password')# 指定数据库db = conn['udata']  # db = client.udata# 指定集合collection = db['tb_stock_list']  # collection = db.tb_stock_list# 插入数据 insert_one()、insert_many()data1 = {}  # 集合,键值对,1条数据data2 = {}  # 集合,键值对,1条数据result = collection.insert_many([data1, data2])# result = collection.insert_one(data1)# 查询数据 find_one()、find()result = collection.find_one({'secu_code': '000001'})# 更新数据 update_one()、update()result = collection.update_one({'secu_code': '000001'}, {'$set': {'hs_code': '000001'}})# 删除数据 remove()、delete_one()和delete_many()result = collection.remove({'secu_code': '000001'})
复制代码

2、SQLite

(1)安装 sqlite3:pip install sqlite3


(2)操作简介


import sqlite3# 连接数据库conn = sqlite3.connect('udata.db')# 创建游标cursor = conn.cursor()# 执行SQLsql = "增减删等SQL代码"cursor.execute(sql)# 查询数据sql = "查询sql代码"values = cursor.execute(sql)# 提交事物conn.commit()# 关闭游标cursor.close()# 关闭连接conn.close()
复制代码

3、PostgreSQL

(1)安装 psycopg2:pip install psycopg2


(2)操作简介


import psycopg2# 连接数据库conn = psycopg2.connect(database="udata", user="postgres"                        , password="密码", host="127.0.0.1", port="5432")# 创建游标cursor = conn.cursor()# 执行SQLsql = "增减删等SQL代码"cursor.execute(sql)# 查询全部数据sql = "查询sql代码"cursor.execute(sql)rows = cursor.fetchall()# 事物提交conn.commit()# 关闭数据库连接conn.close()
复制代码


综上,Python 操作数据库的简要介绍就结束了;还有很多类型的数据库,Python 操作它们的过程大同小异,后续我也将会继续梳理相关资料。


下一节《Python 量化投资数据仓库搭建 3:数据落库代码封装》

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2018.11.07 加入

还未添加个人简介

评论

发布
暂无评论
Python量化数据仓库搭建系列2:Python操作数据库