背景
之前一直使用 tornado 做项目,数据库一直使用 mongo 与 redis 居多,凭借其优异的异步特性工作的也很稳定高效,最近的项目需要使用 mysql ,由于之前在使用 mongo 与 redis 时所使用的 moto 与 aioredis 来异步的执行数据库操作,所以在网上查询了异步操作 mysql 的库, 本文记录一下异步操作中所遇到的问题与相应的解决方案。
Aiomysql 介绍
我们在使用 tornado 开发网站的时候,利用 python3 中新加入的异步关键词 async/await , 我们使用各种异步操作为来执行各种异步的操作,如使用 aiohttp 来代替 requests 来执行异步的网络请求操作,使用 motor 来代替同步的 pymongo 库来操作 mongo 数据库,同样,我们在开发同步的 python 程序时,我们会使用 PyMySQL 来操作 mysql 数据库,同样,我们会使用 aiomysql 来异步操作 mysql 数据库。
Aiomysql 连接
docker run --name mysql -e MYSQL_ROOT_PASSWORD=123456 -p 3306:3306 -d mysql:5.7
复制代码
我们先准备一些测试数据,创建一个 mytest 的数据库,创建一个 user 表,里面有三个字段,id, username, age, 简单的三个字段,并且添加两条数据。
#coding: utf-8
import aiomysql
import asyncio
loop = asyncio.get_event_loop()
async def test():
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
loop=loop
)
cur = await conn.cursor()
await cur.execute("select * from user")
r = await cur.fetchall()
if r:
for i in r:
print(i)
else:
print("no data")
await cur.close()
conn.close()
loop.run_until_complete(test())
复制代码
我们来看下代码,来顺一下执行流程
1. 创建连接
首先我们使用 aiomysql.connect() 创建一个连接对象 conn,代码里只是使用了最常用的连接选项,这个 connect() 方法返回一个 Connection 类对象,这个对象里的参数非常多,我们在后面的代码中,如果遇到会进行相应的介绍。
2. 创建游标
之后我们使用 conn 这个对象的 cursor 方法获取 Cursor 对象 cur,我们只有使用 cursor 对象才能对数据库进行各种操作。
3. 执行 SQL 语句
我们使用 cur 对象的 execute() 方法执行 SQL 语句。这里执行 select * from user ,这个方法返回影响的行数,对于查询而言,是命中查询的数据量,我们也可以根据这里的返回值,如果是 0 的话则说明没有符合查询条件的数据。
cur = await conn.cursor()
count = await cur.execute("select * from user where id = 4")
print("count:{}".format(count))
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no data")
await cur.close()
conn.close()
复制代码
5. 关闭连接 conn
注意 conn 对象的关闭函数不是协程,直接调用 close() 即可。
async with conn.cursor() as cur:
count = await cur.execute("select * from user")
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
Aiomysql 简单的 CURD
上面我们简单地使用游标对象进行了查询,这节我们来看看更多 CURD 操作,其实这里已经和 aiomysql 没有太多的关系,主要是考查各位的 mysql 能力了,一个 execute 方法走天下。但是这里我们来看一个老生常谈的问题,sql 注入问题。
SQL 注入的问题
username = "yyx"
async with conn.cursor() as cur:
sql = "select * from user where username = '%s'" % username
print(sql)
count = await cur.execute(sql)
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
如何避免 SQL 注入
async def execute(self, query, args=None):
"""Executes the given operation
Executes the given operation substituting any markers with
the given parameters.
For example, getting all rows where id is 5:
cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
:param query: ``str`` sql statement
:param args: ``tuple`` or ``list`` of arguments for sql query
:returns: ``int``, number of rows that has been produced of affected
"""
conn = self._get_db()
while (await self.nextset()):
pass
if args is not None:
query = query % self._escape_args(args, conn)
await self._query(query)
self._executed = query
if self._echo:
logger.info(query)
logger.info("%r", args)
return self._rowcount
execute有二个参数,一个是query, 另外是args,我们看注释,query是 sql的语句, args是 tulpe 或者 list 类型的参数。如果args非空,脚本会通过 query = query % self._escape_args(args, conn) 重新组织query, 再来看下 _escape_args(args, conn) 的实现
def _escape_args(self, args, conn):
if isinstance(args, (tuple, list)):
return tuple(conn.escape(arg) for arg in args)
elif isinstance(args, dict):
return dict((key, conn.escape(val)) for (key, val) in args.items())
else:
# If it's not a dictionary let's try escaping it anyways.
# Worst case it will throw a Value error
return conn.escape(args)
复制代码
如果是 list 或者 tuple,则返回使用 conn.escape 转换之后的 tuple, 如果是 dict 字典类型的话,则返回一个字典,key 还是原来的 key, value 为 conn.escape(val) , 最终都是使用 conn.escape() 函数来进行转换,再来看下这个函数的实现
def escape(self, obj):
""" Escape whatever value you pass to it"""
if isinstance(obj, str):
return "'" + self.escape_string(obj) + "'"
return escape_item(obj, self._charset)
def escape_string(self, s):
if (self.server_status &
SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES):
return s.replace("'", "''")
return escape_string(s)
复制代码
函数将在传入的字符串两边加上两个单引号 ’ , 并且将 字符串中的单引号替换成两个单引号,这样就可以避免大多的 sql 注入问题,我们修改一下脚本
username = 'yanyanxin'
async with conn.cursor() as cur:
count = await cur.execute("select * from user where username = %s", username)
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
此时转换后的 SQL 语句为 select * from user where username = ‘yyx’ or 1=1#’ 已经将单引号进行了转义,此时就不会查找到用户了。
注意为了避免 SQL 注入的问题,我们一定不要自己进行拼接 SQL 语句,一定要对用户的输入进行检查转义
多参数的查询
select * from user WHERE age >19 and age<29
复制代码
我们使用 aiomysql 的实现
async with conn.cursor() as cur:
count = await cur.execute("select * from user where age>%s and age<%s", (19, 29))
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
联合查询
我们再创建一个表,表示用户表中用户的职业, 创建三条数据, userid 对应于 user 表中的 id, 这里之所以没有用外键,之后再讨论,只是记住,这里的 userid 只是一个普通的列,它表示 user 表中的 id。
SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx'
复制代码
async with conn.cursor() as cur:
sql = 'SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username=%s'
count = await cur.execute(sql, ('yyx',))
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
日期格式的查询
(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8))
(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35))
复制代码
如果我们想要查询日期大于 2020 年 10 月 31 日的数据我们可以这样写 SQL
select * from user WHERE DATE_FORMAT(updatedate,'%Y%m%d') > '20201031'
datestr = datetime.datetime(2020, 10, 31).strftime('%Y%m%d')
count = await cur.execute("select * from user WHERE DATE_FORMAT(updatedate,'%Y%m%d') > %s", (datestr,))
复制代码
将会得到一个异常
ValueError: unsupported format character 'Y' (0x59) at index 51
复制代码
上面在转换拼接字符串的时候, 由于有个 %Y 的存在, python 默认是不支持这个转换的,所以这样写是不行的,这里其实不需要将 datetime.datetime 类型的数据进行转换,aiomysql 会自动的进行转换
datestr = datetime.datetime(2020, 10, 31)
count = await cur.execute("select * from user WHERE updatedate > %s", (datestr,))
复制代码
我们只需要将 datetime.datetime 类型的数据传到参数里即可,pymysql 内置了基本类型的处理方法
encoders = {
bool: escape_bool,
int: escape_int,
long_type: escape_int,
float: escape_float,
str: escape_str,
text_type: escape_unicode,
tuple: escape_sequence,
list: escape_sequence,
set: escape_sequence,
frozenset: escape_sequence,
dict: escape_dict,
type(None): escape_None,
datetime.date: escape_date,
datetime.datetime: escape_datetime,
datetime.timedelta: escape_timedelta,
datetime.time: escape_time,
time.struct_time: escape_struct_time,
Decimal: escape_object,
复制代码
添加数据
有了上面查询数据的基础,我们再来看下插入数据, 我们同样以正常的 mysql 语句再结合 aiomysql 中的 query 语句进行对比。
1. 插入单条语句
经过表的修改,目前我们的表字段如下
其中 id 为主键自增,新添加的时候可以不用传参数,mysql 会自动添加, username 和 age 是不能为空的,添加的时候必须要传
INSERT INTO `user` (username, age) VALUES ("aaa", 24);
复制代码
使用 aiomysql 来添加
async with conn.cursor() as cur:
count = await cur.execute("insert into user (username, age, updatedate) VALUES(%s, %s, %s)", ("ccc", 33, datetime.datetime.now()))
await conn.commit()
print(count)
if count:
r = await cur.fetchall()
for i in r:
print(i)
print("#########")
count = await cur.execute("select * from user")
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
复制代码
对于日期类型的数据,我们也无需进行处理,直接传入参数即可
2. 插入多条语句
async with conn.cursor() as cur:
users = [
("eee", 26, datetime.datetime(2019, 10, 23)),
("fff", 28, datetime.datetime(2018, 11, 13)),
("ggg", 27, datetime.datetime(2016, 9, 15)),
]
count = await cur.executemany("insert into user ( username, age, updatedate) VALUES(%s, %s, %s)", users)
print(count)
if count:
r = await cur.fetchall()
for i in r:
print(i)
print("#########")
count = await cur.execute("select * from user")
if count:
r = await cur.fetchall()
for i in r:
print(i)
else:
print("no user")
for arg in args:
await self.execute(query, arg)
rows += self._rowcount
self._rowcount = rows
复制代码
如何处理插入失败
插入失败常有,比如主键重复,数据类型不对等,我们需要去抓住这些异常来进行处理
count = await cur.execute("insert into user (id, username, age, updatedate) VALUES(%s, %s, %s, %s)",(1, "ddd", 34, datetime.datetime.now()))
pymysql.err.IntegrityError: (1062, "Duplicate entry '1' for key 'PRIMARY'")
async with conn.cursor() as cur:
try:
count = await cur.execute("insert into user (id, username, age, updatedate) VALUES(%s, %s, %s, %s)", (1, "ddd", 34, datetime.datetime.now()))
print(count)
except pymysql.err.IntegrityError as e:
print(e)
except Exception as e:
raise e
复制代码
cursor 类型
(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8), 0)
(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35), 2)
(3, 'aaa', 24, None, None)
(8, 'ccc', 33, datetime.datetime(2020, 11, 2, 17, 59, 38), None)
(27, 'aaa', 16, None, None)
复制代码
可以使用 aiomysql.cursors.DictCursor 类初始化
conn.cursor(aiomysql.cursors.DictCursor) as cur
复制代码
获取到的结果将以字典的形式返回
{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}
{'id': 2, 'username': 'yyx', 'age': 28, 'updatedate': datetime.datetime(2020, 11, 1, 21, 44, 35), 'isstudent': 2}
{'id': 3, 'username': 'aaa', 'age': 24, 'updatedate': None, 'isstudent': None}
{'id': 8, 'username': 'ccc', 'age': 33, 'updatedate': datetime.datetime(2020, 11, 2, 17, 59, 38), 'isstudent': None}
{'id': 27, 'username': 'aaa', 'age': 16, 'updatedate': None, 'isstudent': None}
复制代码
连接池的使用
之前我们一直使用 aiomysql.connect() 方法来连接到数据库,aiomysql 还提供了连接池的接口,有了连接池的话,不必频繁打开和关闭数据库连接。
上面的代码,我们都是执行一个函数就创建一个连接,我们知道,客户端在与服务端创建连接也是一个比较耗时耗资源的操作,所以我们会通过连接池来减少与 mysql 数据库的频繁打开和关闭连接。
loop = asyncio.get_event_loop()
async def test():
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
loop=loop
)
async def get_user():
async with conn.cursor() as cur:
count = await cur.execute("select * from user")
if not count:
return
r = await cur.fetchall()
print("get data from user")
for i in r:
print(i)
async def get_jobs():
async with conn.cursor() as cur:
count = await cur.execute("select * from jobs")
if not count:
return
r = await cur.fetchall()
print("get data from jobs......")
for i in r:
print(i)
await asyncio.gather(get_jobs(), get_user())
loop.run_until_complete(test())
复制代码
我们在 test() 函数里写了两个子函数,get_user 和 get_jobs 分别从 user 表和 jobs 表中获取数据,当然我们可以使用
await get_user()
await get_jobs()
复制代码
来分别执行,但是这种方式是同步的,并没有异步去执行,我们想要这两个函数异步进行,所以我们使用
await asyncio.gather(get_jobs(), get_user())
复制代码
这种方式调用,让这两个协程并行执行, 但是这样写就会报错
RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
复制代码
所以这里我们需要用两个不同的连接, 当然可以在每个函数中都重新对 mysql 数据进行连接,在执行完查询操作以后再关闭,但是这样就会造成之前说有频繁的创建连接会造成一些资源的浪费,同时网站的性能也会受到影响。
所以这时我们需要使用连接池,连接池会保存一定数量的连接对象,每个函数在需要使用的时候从池子中拿一个连接对象, 使用完以后再将连接对象放到池子中, 这样避免了频繁的和 mysql 数据库进行打开关闭操作,同时也避免出现上面的同个连接在不同的协程对象中使用而出现的异常。
loop = asyncio.get_event_loop()
async def test():
pool = await aiomysql.create_pool(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
minsize=1,
maxsize=2,
echo=True,
autocommit=True,
loop=loop
)
async def get_user():
async with pool.acquire() as conn:
print(id(conn), 'in get user')
async with conn.cursor() as cur:
count = await cur.execute("select * from user")
if not count:
return
r = await cur.fetchall()
print("get data from user")
for i in r:
print(i)
async def get_jobs():
async with pool.acquire() as conn:
print(id(conn), 'in get jobs')
async with conn.cursor() as cur:
count = await cur.execute("select * from jobs")
if not count:
return
r = await cur.fetchall()
print("get data from jobs......")
for i in r:
print(i)
async def get_email():
async with pool.acquire() as conn:
print(id(conn), 'in get email')
async with conn.cursor() as cur:
count = await cur.execute("select * from email")
if not count:
return
r = await cur.fetchall()
print("get data from email......")
for i in r:
print(i)
await asyncio.gather(get_jobs(), get_user(), get_email())
loop.run_until_complete(test())
复制代码
连接池的初始化函数 aiomysql.create_pool 和 aiomysql.connect 参数差不多,数据库的基本信息, 这里多了两个参数 minsize,maxsize, 最少连接数和最大连接数,我这里为了实验,将最大连接数设置为 2,然后下面用了三个函数来获取连接池,我们将连接对象 conn 的 id 信息打印出来看下
2977786527496 in get jobs
2977786527496 in get user
2977786590984 in get email
复制代码
上面的脚本也不再报错,并且可以正常的获取到数据库里的信息,且都是异步的进行查询
我们也要注意一下,由于是演示代码,我们在开发过程中,不太会写这样的代码,更多的时候,我们是写 web 程序,比如用 tornado 写个 web 程序, 不同的接口需要进行不同的查询操作,为了保证查询同时进行,此时我们就需要用连接池了。
事务的处理
关于事务的介绍,网上有好多,关于数据库事务具有 ACID 这 4 个特性:原子性,一致性,隔离性,持久性以及不同的隔离级别所带来的脏读、不可重复读、幻读等问题,推荐廖雪峰的 sql 教程, 讲的很清晰。
这里介绍一下在 aiomysql 中事务的处理,
之前我们在初始化连接或者连接池的时候,都加上了 autocommit=True, 这个设置, autocommit=True
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))
# 不调用conn.commit()
c = await cur.execute("select * from user")
result = await cur.fetchall()
for i in result:
print(i)
复制代码
当然,在执行 conn.commit() 时,是有可能失败的,比如插入一半的数据,被别的事务所干扰,此时这里就会抛异常。
现在有一个问题,既然可以设置 autocommit=True 让数据库自动提交事务,我们为什么还要自己来开启事务,然后再手动调用 conn.commit() 来提交呢?
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.cursors.DictCursor) as cur:
try:
await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))
await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa2", 13, 1111))
except Exception as e:
print(e)
c = await cur.execute("select * from user")
result = await cur.fetchall()
for i in result:
print(i)
复制代码
在上面的语句中, 第一次 insert 语句没有问题,可以正常的插入数据库,但是第二个语句,由于格式转换有问题,这时会崩溃,第二条语句不会插入成功,但是现在问题就来了,我要求是这两条语句要么全执行,要么都不执行, 上面的代码没法保证数据的一致性, 破坏了事务的原子性与一致性,所以这时我们需要使用自己手工来处理事务。
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.cursors.DictCursor) as cur:
await conn.begin() # 开启事务
try:
await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa", 16))
await cur.execute("insert into user(username, age) values(%s, %s)", ("aaa2", 13, 1111))
await conn.commit()
except Exception as e:
print(e)
await conn.rollback() #回滚
c = await cur.execute("select * from user")
result = await cur.fetchall()
for i in result:
print(i)
复制代码
此时第一条语句就不会被插入成功了. 如果在初始化连接或者连接池时设置了 autocommit=True 参数,则这里需要调用 conn.begin()` ,如果没有设置 autocommit 参数则默认是 False, 后面也不用显示的调用 conn.begin(), 但是需要显示的调用 conn.commit()
Sqlalchemy 介绍
SQLAlchemy 是 Python 编程语言下的一款开源软件。提供了 SQL 工具包及对象关系映射(ORM)工具,使用 MIT 许可证发行。
SQLAlchemy“采用简单的 Python 语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy 的理念是,SQL 数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。因此,SQLAlchmey 采用了类似于 Java 里 Hibernate 的数据映射[4]模型,而不是其他 ORM 框架采用的 Active Record 模型。不过,Elixir[5]和 declarative 等可选插件可以让用户使用声明语法。
SQLAlchemy 首次发行于 2006 年 2 月,并迅速地在 Python 社区中最广泛使用的 ORM 工具之一,不亚于 Django 的 ORM 框架。
01
ORM 介绍
ORM, 全称 Object-Relational Mapping,将关系数据库的表结构映射到对象上, 使得操作数据库的关系转换成操作 python 中的对象
在 Aiomysql 中使用 Sqlalchemy
在使用 aiomysql 原生的 mysql 连接时,我们使用 aiomysql.connect 函数来获取 aiomysql 连接对象,在使用 sqlalchemy 时,需要使用 aiomysql.sa.create_engine 函数来创建一个引擎对象。
在 aiomysql 中,不能使用类来定义, 需要使用 aiomysql.sa.Table 来返回 ORM 对象, 也不能使用 session, 执行查询操作需要在一个连接对象上
import aiomysql
import asyncio
import logging
import pymysql
import sqlalchemy as sa
from aiomysql.sa import create_engine
loop = asyncio.get_event_loop()
metadata = sa.MetaData()
user = sa.Table(
"user",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column('username', sa.String(255), nullable=False, default=""),
sa.Column('age', sa.Integer, nullable=False, default=0),
sa.Column('updatedate', sa.DateTime, nullable=True),
sa.Column('isstudent', sa.Boolean, nullable=True)
)
async def test():
engine = await create_engine(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
autocommit=True,
loop=loop
)
async with engine.acquire() as conn:
query = sa.select([user])
result = await conn.execute(query)
for i in await result.fetchall():
print(i)
loop.run_until_complete(test())
复制代码
1. 创建元类
使用 metadata = sa.MetaData() 创建一个元类,这个元类会包含各种表的关系,之后会介绍
2. 创建表
使用上面创建的元类 metadata 来创建表结构, 第一个字段为表名, 第二个参数为元类对象,之后为每个字段的信息对象,第一个为字段名,第二个为类型,之后会是一些字段选项
类型描述|------
以下是一些常见的字段属性
primary_key: 是否为主键
autoincrement: 是否自增
index: 是否为索引
nullable: 是否可以为空, True的时候为可以为空
comment: 注释
复制代码
4. 获取连接
通过 engine.acquire() 来获取一个连接
5. 执行查询语句
这里和 aiomysql 不一样,这里直接使用连接对象 conn 的 execute(query) 方法
6. 打印显示结果
这里调用的 SQL 语句和直接使用 SQL 或者上面使用 aiomysql 的 execute 来执行 sql 语句来讲,比较晦涩难懂, 下面详细记录一下各种查询在 sqlalchemy 中的实现
使用 Sqlalchemy 的 CURD
no.1
简单查询数据
query = sa.select([user])
result = await conn.execute(query)
SELECT "user".id, "user".username, "user".age, "user".updatedate, "user".isstudent
FROM "user"
复制代码
sa.select([user]) select() 函数参数必须是个列表或者可迭代对象,这个简单的查询不用提供 from 表, sa 会自动算出需要在哪张表中查询
no.2
选择哪些返回列
query = sa.select([user.columns.updatedate, user.c.username])
复制代码
no.3
带条件的查询
可以在 select() 函数后面加上调用 where() 函数来设置查询条件
query = sa.select([user]).where(user.columns.username == "yyx")
复制代码
在返回值中,由于我们在定义 user 的时候,isstudent 字段我们设置的是 sa.Boolean ,这时,当值为 0 时该值为 False, 非 0 时为 True
上面的打印输出为 (2, ‘yyx’, 28, datetime.datetime(2020, 11, 1, 21, 44, 35), True)
2. 防注入
query = sa.select([user]).where(user.columns.username == "yyx' or 1=1#")
复制代码
得到的 query 语句为
SELECT user.id, user.username, user.age, user.updatedate, user.isstudent
FROM user
WHERE user.username = 'yyx\' or 1=1#'
复制代码
3. 多条件查询
有时我们会使用多个条件查询, 比如我们要查找 age 大于 24, id 小于 11 的用户信息。
逻辑查询关系可以分为或(or)且(and)非(not)的关系,我们可以使用 sqlalchemy.sql 中的 and_, or_, not_ 来指定逻辑关系.注意这里为了和 python 中的关键字作为区分都有一个下划线。
async with engine.acquire() as conn:
w = and_(
user.columns.id < 11,
user.columns.age > 14
)
query = sa.select([user]).where(w)
print(str(query))
result = await conn.execute(query)
for i in await result.fetchall():
print(i)
SELECT user.id, user.username, user.age, user.updatedate, user.isstudent
FROM user
WHERE user.id < 11 AND user.age > 14
复制代码
4. 日期查询
我们要查询 updatedate 大于 2020-11-02 的用户信息
query = sa.select([user]).where(user.columns.updatedate>datetime.datetime(2020, 11, 2))
SELECT user.id, user.username, user.age, user.updatedate, user.isstudent
FROM user
WHERE user.updatedate > '2020-11-02 00:00:00'
复制代码
5. False 查询与 None 查询
query = sa.select([user]).where(user.columns.isstudent==False)
(1, 'yanyanxin', 18, datetime.datetime(2020, 10, 31, 16, 43, 8), False)
复制代码
但是表中还有没有设置该字段的数据是查不到的,这里的 False 和 None 是不一样的,如果想要获取到没有设置 isstudens 字段的数据需要使用
query = sa.select([user]).where(user.columns.isstudent==None)
复制代码
no.4
插入操作
sa 的插入操作很灵活,有好多种插入方法,下面依次进行试验
await conn.execute(user.insert().values(username="fan", age=16))
userinfo = {"username": "hhh","age": 33,"id": None,"updatedate":None,"isstudent":None}
result = await conn.execute(user.insert(), userinfo)
result = await conn.execute(user.insert(), (None, "yang", 88, None, True))
result = await conn.execute(user.insert(), id=None, username="lllll", age=99,
updatedate=datetime.datetime.now(), isstudent=True)
result = await conn.execute(user.insert(), None, "mmmm", 9, None, None)
复制代码
no.5
复杂的查询 join
SELECT jobs.jobs, user.username from jobs INNER JOIN user ON user.id=jobs.userid where user.username='yyx'
复制代码
在 sa 中,我们需要使用 select_from 函数来定义 JOIN
# 定义jobs表结构
jobs = sa.Table(
'jobs', metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("jobs", sa.String(50), nullable=False, default="qa"),
sa.Column("userid", sa.Integer, nullable=False)
)
async with engine.acquire() as conn:
j = user.join(jobs, user.c.id == jobs.c.userid)
query = sa.select([user.c.username, jobs.c.jobs]).select_from(j).where(user.c.username == 'yyx')
result = await conn.execute(query)
for i in await result.fetchall():
print(i)
SELECT user.username, jobs.jobs
FROM user INNER JOIN jobs ON user.id = jobs.userid
WHERE user.username = 'yyx'
复制代码
no.6
use_labels 问题
j = user.join(jobs, user.c.id == jobs.c.userid)
query = sa.select([user, jobs]).select_from(j).where(user.c.username == 'yyx')
复制代码
我们想要获取 user 和 jobs 的所有字段,此时会报错
aiomysql.sa.exc.InvalidRequestError: Ambiguous column name 'id' in result set! try 'use_labels' option on select statement.
复制代码
这是由于 user 和 jobs 表中都有 id 这个字段,返回的话将无法确定是谁的,需要使用 use_labels 参数,
query = sa.select([user, jobs], use_labels=True).select_from(j).where(user.c.username == 'yyx')
复制代码
上面的结果返回为
(2, 'yyx', 28, datetime.datetime(2020, 11, 1, 21, 44, 35), True, 2, 'qa', 2)
复制代码
no.7
获取返回值字段属性
上面的结果是一个元组,我们还可以打印指定的字段
for i in await result.fetchall():
print(i.username, i.jobs)
复制代码
如果加了 use_labels=True 时,也需要添加上表名, 表名_字段
for i in await result.fetchall():
print(i.user_username, i.jobs_jobs)
复制代码
是否需要使用外键
上面无论是使用 aiomysql 还是使用 sa,都没有使用外键进行约束,关于是否使用外键,业内有两种不同的意见,支持使用的会认为,人为的写程序难免会有 bug, 会有不注意的地方,就好比 jobs 表中插入了一个 userid 为 100 的数据,但是 userid 为 100 的用户并没有在 user 表中,这时如果使用外键约束,则插入会失败. 在 mysql 数据库的层面上对数据一致性增加了一层保障。
但是反对使用外键的人认为,这样会增加数据库本身的负担,数据的一致性正确性应该由开发人员来保障,数据库有了外键的约束在处理数据速度上会受到影响。
业内现在大多数公司已经不使用外键了,甚至在数据库层面上已经将该功能禁掉以保障数据库的速度,所以我们在以后的开发中,也尽量的少使用甚至不使用外键,当然,这个也看业务,但是如果公司将 mysql 的外键都禁掉的话就只能人为的来保障数据的正确性了。
数据库重连问题
有时候会出现这种情况,数据库偶尔的宕机或者网络抖动,造成了程序与数据库连接断了, 此时,当网络恢复了,正常来讲我们不希望再重启的我们的 web 服务,而是程序会自动的进行重新连接。
loop = asyncio.get_event_loop()
async def test():
conn = await aiomysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
loop=loop
)
while True:
try:
async with conn.cursor(aiomysql.cursors.DictCursor) as cur:
c = await cur.execute("select * from user where id = 1")
result = await cur.fetchall()
for i in result:
print(i)
except:
pass
finally:
await asyncio.sleep(1)
loop.run_until_complete(test())
{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}
{'id': 1, 'username': 'yanyanxin', 'age': 18, 'updatedate': datetime.datetime(2020, 10, 31, 16, 43, 8), 'isstudent': 0}
2020-11-03 18:24:31,206 - asyncio - WARNING - C:\Python37\lib\asyncio\selector_events.py[:863] - socket.send() raised exception.
....
....
....
loop = asyncio.get_event_loop()
async def test():
pool = await aiomysql.create_pool(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
loop=loop
)
while True:
try:
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.cursors.DictCursor) as cur:
c = await cur.execute("select * from user where id = 1")
result = await cur.fetchall()
for i in result:
print(i)
except:
pass
finally:
await asyncio.sleep(1)
loop.run_until_complete(test())
import aiomysql
from functools import wraps
def mysql_connection_check(func):
@wraps(func)
async def wrapper(*args, **kwargs):
mysql = args[0]
if mysql:
if not mysql.isconnect:
# 进行重连
await mysql._lock.acquire()
try:
await mysql.restart()
except:
print(traceback.format_exc())
finally:
await mysql._lock.release()
try:
return await func(*args, **kwargs)
except (OperationalError, ConnectionResetError, OSError):
mysql.isconnect = False
except Exception as e:
print(traceback.format_exc())
return wrapper
class PMysql:
'''
对于aiomysql进行封,实现自动重连功能
'''
def __init__(self, host, user, password, db, port=3306, **kwargs):
'''
:param host:
:param user:
:param password:
:param db:
:param port:
:param kwargs: minsize=1, maxsize=10,echo=False
'''
self.isconnect = False
self.host = host
self.user = user
self.password = password
self.db = db
self.port = port
self.kwargs = kwargs
self._lock = asyncio.Lock()
self._pool = None
self.isconnect = False
async def init_pool(self):
try:
self._pool = await aiomysql.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.db,
**self.kwargs
)
self.isconnect = True
except:
self.isconnect = False
async def close(self):
try:
if self._pool:
self._pool.close()
await self._pool.wait_closed()
self._pool = None
self.isconnect = False
except:
print("close error", traceback.format_exc())
self.pool = None
self.isconnect = False
async def restart(self):
print("will restart connect..... ")
await self.close()
await self.init_pool()
@mysql_connection_check
async def execute(self, query, args=None):
'''
执行execute语句
:param query:
:param args:
:return:
'''
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, args)
return cur
loop = asyncio.get_event_loop()
async def test():
t = PMysql(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
autocommit=True,
minsize=1,
maxsize=2,
loop=loop)
await t.init_pool()
while True:
try:
cur = await t.execute("select * from user where id = %s", 1)
for i in await cur.fetchall():
print(i)
except:
pass
finally:
await asyncio.sleep(1)
@mysql_connection_check
async def get_a_conn(self):
return await self._pool.acquire()
@mysql_connection_check
async def releaseconn(self, conn):
return await self._pool.release(conn)
@mysql_connection_check
async def get_a_cursor(self, conn):
return await conn.cursor()
@mysql_connection_check
async def release_a_cur(self, cur):
await cur.close()
@mysql_connection_check
async def transaction(self, conn):
await conn.begin()
@mysql_connection_check
async def commit(self, conn):
await conn.commit()
@mysql_connection_check
async def rollback(self, conn):
await conn.rollback()
@mysql_connection_check
async def execute(self, query, args=None):
'''
执行execute语句
:param query:
:param args:
:return: 游标
'''
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(query, args)
return cur
@mysql_connection_check
async def executemany(self, query, args=None):
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.executemany(query, args)
return cur
loop = asyncio.get_event_loop()
async def test():
t = PMysql(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='mytest',
autocommit=True,
minsize=1,
maxsize=2,
loop=loop)
await t.init_pool()
conn = await t.get_a_conn()
cur = cur = await t.get_a_cursor(conn)
try:
await t.transaction(conn)
await cur.execute("insert into user (username, age) values(%s, %s)", ("xxx", 11))
await cur.execute("insert into user (username, age) values(%s, %s)", ("xxx", 11, 333))
print(cur.lastrowid)
await t.commit(conn)
except:
await conn.rollback()
finally:
if cur:
await t.release_a_cur(cur)
if conn:
await t.releaseconn(conn)
复制代码
是否使用 Sqlalchemy
通过上面的介绍,想必大家也可以看出,sa 在代码的可读性方面似乎没有直接 SQL 语句好,但是 sa 的存在意义在于,你现在使用的是 MySQL, 没准哪天项目需要迁移到 oracle 或者 sqlite,这时你几乎不用修改什么代码就可以顺利的完成迁移,如果直接使用 SQL 语言你就需要修改大量的代码.再者 ORM 会在底层对查询做一些转换,像之前提到的注入问题,如果有手写 SQL 时难免会写出漏洞来。但是我们做项目,又很少能过遇到修改数据库的情况,所以是否要使用 sa,看各位的需求!
更多学习资料戳下方!!!
https://qrcode.ceba.ceshiren.com/link?name=article&project_id=qrcode&from=infoQ×tamp=1662366626&author=xueqi
评论