前言
相信大家对接口自动化已经不陌生了,这是几乎我们每个迭代都会投入的事情,但耗费了这么多精力去编写和维护,实际的收益如何呢?如果收益不好,是不是说明我们自动化 case 的实现方式、使用方式还有改进的地方呢?以下是接入得物接口自动化平台后的一些实践和想法,欢迎大家积极交流~
浅谈接口自动化
1.1 使用场景 &可以带来的效果
在接入自动化平台前,我们只能本地拉取代码->执行用例,所以执行者也只有测试人员。接入平台后,通过宣导 or 分享,开发可以方便的找到需要的用例(用例模块和标题需描述清晰),从而帮助他们造数或自测。
对于一些核心场景,即使业务迭代,通常结果也不会发生太大变化,这一类的场景 case 如果设计地较为稳定(当然这里的稳定不是只校验 code=200 就行),可以分享给开发用于自测,根据开发同学使用后的反馈,他们自测简单了许多,也有帮助他们发现过问题。
另外有一些本迭代内的新增接口,在接口评审完成后,我们可以提前编写好,根据具体情况决定是先保证接口状态的正常,后续再补充数据逻辑的校验,还是直接先把 case 写好。因为很多时候开发自测都只是调用本地代码,提测后连接口都调不通,如果提测前可以先进行基本的校验,就能减少冒烟测试被阻塞的概率。
_冒烟测试:_针对改动点挑出涉及的接口 case,再加上 P0 级别 case,提测后先执行一遍看看是否正常,如果核心链路异常,阻塞了后续测试,就可以直接打回了。
_验证 bug:_有些复杂场景,测试链路较长,测试数据准备又很困难,很容易出现 bug,而出现 bug 也就算了,偏偏改一遍还不一定能改好...这时候自动化的价值就体现了,把这些场景利用自动化实现,验证 bug 时直接一键执行就能得出结果,大大节省了时间,同时也稳定了自己濒临暴躁的情绪。
_回归测试:_在每次的 bvt 测试、覆盖率跟进中,有些 case 可能并不涉及本次需求改动范围,场景又比较简单基础,我们就可以利用自动化去覆盖。执行通过,视具体情况可以简单看一眼或者不再回归。
虽然我们现在有了造数平台,但实现起来有一定的成本,一些场景可能除了自己没有别的业务方有造数需求,并且场景很简单,只需调个接口,改个数据表就行,那么最快的造数方法就是自动化脚本。现在有了自动化平台,我们可以更好地分享给有造数需求的开发、产品、测试。
当然,以上效果的前提是我们的自动化 case 比较稳定,不能每次执行都一堆不通过,这样时间都耗费在排查问题上了,效果会大打折扣,别人也不会再愿意使用。
1.2 什么时间去写自动化 case
通常一部分同学会在用例评审结束,开发提测之前进行 case 编写,此时需要实现自动化的场景已经明确,基本上涉及的接口和出入参都已确定,自动化 case 的大致框架就形成了。这时候实现自动化,就可以最大化地发挥其价值,在上述涉及到的几个场景都能投入使用。如果因为时间不够或接口尚未明确,可以先梳理好需要实现自动化的场景步骤,在提测后一边手动执行用例一边补充接口参数和校验点。针对级别较低的接口场景,也可以放在版本结束后再实现,只是效果会降低一些。
1.3 自动化维护成本太高怎么办
我们维护的 case 一般有两种,一是自己写的,二是别人写的。自己写的,含着泪也要日常维护。别人写的,由于大家的编码风格千差万别,在接入自动化平台前,维护起来简直困难重重,当我们为了通过率去推进 case 更新时,往往这一类的难以推进。现在接入了平台,基本上统一了 case 模板,当因为需求变动需要更新时,有时只需要修改出入参和断言即可,一定程度上已经降低了维护成本。
另外,当 case 经常报错时,可以看看设计上是否能优化。有些依赖性强的数据,是否可以通过其他手段让这部分数据稳定下来。比如发优惠券的场景,前提需要一张有效的券,那我们在发券前可以先获取一张有效的券信息,或者在发券前先创建一张券,发完券后如果需要对券信息进行校验,也通过变量的方式。针对单个测试点实现自动化时,可以尽可能地与其他测试点解藕,充分利用前置脚本,通过修改数据表等方式较少依赖。case 中也可以设置失败重试次数,减少由于环境不稳定等原因造成的失败。
在自动化平台上的实践
2.1 场景 case 的编写
举个例子:“得物 App 新客人群领取优惠券并触发金额膨胀,多次触发膨胀应该只有一次膨胀成功”。
这个 case 在迭代中提高了测试效率,并且在后续需求变更时,帮助开发自测,解决造数问题并发现了 bug。
import json
import requests
from util.db_mysql import DBMySQL
from util.db_redis import DbRedis
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
userId = l_vars.get('userId')
n = int(userId)%4
dbA = DBMySQL(env_vars.get("db.A"))
dbB = DBMySQL(env_vars.get("db.B"))
try:
sql_1 = "SELECT * FROM table_A WHERE user_id = %s;"%userId
# 领券后,用户领券状态校验
user_coupon_info = dbA.select(sql_1)
logger.info(newbie_res)
asserts.assertEqual(user_coupon_info[0].get("status"), 1, msg="数据表领券状态为true")
asserts.assertEqual(user_coupon_info[0].get("type"), 0, msg="当前券类型为0")
asserts.assertIsEmpty(user_coupon_info[0].get("coupon1"), msg="无资产1")
asserts.assertIsEmpty(user_coupon_info[0].get("coupon2"), msg="无资产2")
asserts.assertIsEmpty(user_coupon_info[0].get("coupon4"), msg="无资产4")
asserts.assertIsNotEmpty(user_coupon_info[0].get("info"), msg="券包信息非空")
#获取用户分组,确定用户是命中了实验组的
group = user_coupon_info[0].get("group")
asserts.assertNotEqual(group, 0, msg="用户命中对照组,无膨胀券")
#获取膨胀资产配置
sql_2 = "SELECT * FROM table_B WHERE id = 50%s and deleted=0"%group
logger.info("sql_2:"+sql_2)
coupon_config = dbA.select(sql_2)
logger.info("coupon_config:"+coupon_config)
content = json.loads(coupon_config[0].get("content_info"))
for i in range(3):
activityId = content[i]["activityId"]
l_vars.set('activityId_{}'.format(i+1), activityId)
# 优惠券表校验
sql_3 = "SELECT * FROM a_coupon_%s WHERE user_id = %s and activity_id = %s;"%(n,userId,activityId)
logger.info("sql_3:"+sql_3)
coupon_res = dbB.select(sql_3)
logger.info("coupon_res:"+coupon_res)
if(i==0):
asserts.assertIsEmpty(coupon_res, msg="未到账资产1")
if(i==2):
asserts.assertIsNotEmpty(coupon_res, msg="到账资产3")
finally:
dbA.close()
dbB.close()
复制代码
import json
import requests
from util.db_mysql import DBMySQL
from util.db_redis import DbRedis
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
call_param = sys_funcs.get_call_param()
userId = call_param.get('userId')
activityId = call_param.get('activityId')
dbB = DBMySQL(env_vars.get("db.B"))
if not userId:
user_var = l_vars.get(call_param.get('var_userId'))
userId = user_var
if not activityId:
activityId_var = l_vars.get(call_param.get('var_activityId'))
activityId = activityId_var
if not userId and not activityId:
raise '请传入查询条件'
try:
if not activityId:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s;"%(int(userId)%4,userId)
elif not userId:
sql = "SELECT * FROM a_coupon_%s WHERE activity_id = %s;"%(n,activityId)
else:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s and activity_id = %s;"%(int(userId)%4,userId,activityId)
logger.info(sql)
res = dbB.select(sql)
logger.info(res)
l_vars.set("select_tableB_res",res)
except Exception as e:
logger.info(f'查询失败【{str(e)}】')
raise e
finally:
dbB.close()
return res
复制代码
import json
import requests
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
select_tableB_res = l_vars.get('select_tableB_res')
asserts.assertIsNotEmpty(select_tableB_res, msg="到账资产1")
复制代码
import json
import requests
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
select_tableB_res = l_vars.get('select_tableB_res')
asserts.assertEqual(len(select_tableB_res),1,msg="只到账资产1一张")
复制代码
import json
import requests
from util.db_mysql import DBMySQL
from util.db_redis import DbRedis
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
call_param = sys_funcs.get_call_param()
userId = call_param.get('userId')
activityId = call_param.get('activityId')
dbA = DBMySQL(env_vars.get("db.A"))
if not userId:
user_var = l_vars.get(call_param.get('var_userId'))
userId = user_var
if not activityId:
activityId_var = l_vars.get(call_param.get('var_activityId'))
activityId = activityId_var
if not userId and not activityId:
raise '请传入查询条件'
try:
if not activityId:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s;"%(int(userId)%4,userId)
elif not userId:
sql = "SELECT * FROM a_coupon_%s WHERE activity_id = %s;"%(n,activityId)
else:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s and activity_id = %s;"%(int(userId)%4,userId,activityId)
logger.info(sql)
res = dbA.select(sql)
logger.info(res)
l_vars.set("select_tableA_res",res)
except Exception as e:
logger.info(f'查询失败【{str(e)}】')
raise e
finally:
dbA.close()
return res
复制代码
2.3 测试计划的执行
配置平台用例计划,选择依赖应用,按照自己的需要选择执行频次。然后再编辑计划,配置匹配规则,可以看到关联的自动化用例。
在用例平台绑定自动化 case,在转测单平台添加自动化计划,已关联的用例在执行结束后会自动更新执行状态,提高手动执行的效率。
平台编写 case 的常用方法
3.1 查询 DB 数据库
在环境变量中配置数据库连接信息
在脚本中对数据表进行查询
import json
import requests
from util.db_mysql import DBMySQL
from util.db_redis import DbRedis
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
call_param = sys_funcs.get_call_param()
userId = call_param.get('userId')
activityId = call_param.get('activityId')
dbA = DBMySQL(env_vars.get("db.A"))
if not userId:
user_var = l_vars.get(call_param.get('var_userId'))
userId = user_var
if not activityId:
activityId_var = l_vars.get(call_param.get('var_activityId'))
activityId = activityId_var
if not userId and not activityId:
raise '请传入查询条件'
try:
if not activityId:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s;"%(int(userId)%4,userId)
elif not userId:
sql = "SELECT * FROM a_coupon_%s WHERE activity_id = %s;"%(n,activityId)
else:
sql = "SELECT * FROM a_coupon_%s WHERE user_id = %s and activity_id = %s;"%(int(userId)%4,userId,activityId)
logger.info(sql)
res = dbA.select(sql)
logger.info(res)
l_vars.set("select_tableA_res",res)
except Exception as e:
logger.info(f'查询失败【{str(e)}】')
raise e
finally:
dbA.close()
return res
复制代码
3.2 获取应用 ip 地址作为 host 域名
3.3 一个 case 下多个随机账号切换请求
import json
import requests
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
l_vars.set("user1",l_vars.get("sys.public.login.headers"))
复制代码
import json
import requests
def call(env_vars, g_vars, l_vars, sys_funcs, asserts, logger, **kwargs):
l_vars.set("sys.public.login.headers", l_vars.get("user1"))
复制代码
使用平台时遇到的一些问题
4.1 查询 redis,返回的数据带 b'
解决方法一:不使用平台的工具,代码如下:
import redis
redisConn = redis.Redis(host='redis.host', port=666, password='test123',db=1, decode_responses=True)
复制代码
解决方法二:redis 平台工具返回是数据是 bytes 类型,需要 encoding 一下
re = DbRedis.ger_redis(link_info)
test = re.get(test_key)
test_str = test.decode(encoding='utf-8')
key = key+test_str
re.set(key,"aaa")
复制代码
4.2update、insert、delete 语句执行成功,数据库却未生效
解决方式:需要 db.commit() ,select 语句不需要该语句
dbA = DBMySQL(db_A)
sql = "INSERT INTO t(name,age) VALUES (%s, %s);"
try:
res = db.insert(sql,['lucy', 18])
db.commit()
finally:
dbA.close()
复制代码
备注:delete 方式,删除数据量是 0.会有 error。
4.3http 组件 json 请求体中有中文,运行报错
解决方式:请求头配置 application/json;charset=UTF-8
总结
接入自动化平台后,方便了很多,也还有更多的使用场景待探索和交流。自动化最主要的目的是提效,时间节省下来后我们可以有更多的时间去思考异常场景以及复杂场景,做一些探索测试,减少因为用例设计遗漏而发生的问题。
评论