对信用卡欺诈 Say No!百行代码实现简化版实时欺诈检测
进入互联网时代,你的绝大部分操作都可以在网上进行,极大的方便了我们的生活。但是信用卡盗刷者也可以利用网络来诈骗,典型的做法是:诈骗者首先入侵安全级别较低系统来盗窃信用卡卡号,用盗得的信用卡进行很小额度的消费进行测试,如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买那些可以倒卖的财物,实现诈骗敛财的目标。
大部分银行都有针对信用卡诈骗的反欺诈检测系统,通过对诈骗模式进行识别,及时通知用户或者直接冻结账户,来避免进一步损失。flink的入门介绍文档中就展示一种信用卡诈骗检测的实现方式,但是数据来源是一个静态数组,不符合实际用户场景。本文将介绍一种方案,通过少于 100 行代码修改该实例程序,实现基于 Oracle 上账户表变更的实时欺诈检测。
1.传统实时欺诈检测方案分析
Flink示例程序会检测每一笔交易,若发现一个帐户在 1 分钟内,先出现了一笔小交易(小于 1),后面又出现了一笔大交易(大于 500),则认为出现了欺诈交易,立即输出警告。具体的代码解析可以阅读基于DataStream API 实现欺诈检测
但是,示例程序中的数据来源 TransactionSource 的数据来源是一个静态数组 private static List<Transaction> data = Arrays.asList(new Transaction(1L, 0L, 188.23D)...的迭代器。一般情况下,客户的余额是存储在 Oracle 的账户表中的。怎么将客户余额的变化输出到 Flink 中,来实现实时的欺诈检测列?能想到的方案列举如下:
方案 1:轮询从 Oracle 账户表查询余额变更 应用程序固定时间间隔去轮询 Oracle 账户表的数据,检查到某个客户的账户余额发生了变化后,通知 Flink 进行欺诈检测。这种方案需要不断轮询 Oracle 数据库,对有数据库性能影响,并且就算轮询的间隔足够短,还是有可能漏掉了一些账户变更信息,不可取。
方案 2:业务代码修改 Oracle 账户表时通知 Flink 修改交易程序,在它去更新 Oracle 账户表时,通知 Flink 进行欺诈检测。这种方案的优势在于不会丢掉任何账户变更的事件;但是需要修改交易程序,会导致业务程序耦合度提升。实现上如果采用同步模式,可能会由于 Flink 失败而导致交易失败,也会大大提高交易持续时间;而采用异步方式,需要考虑通知 Flink 和写入账户表的原子性,有可能成功通知了 Flink 但是写入账户表失败了,也有可能写入账户表成功了,却没有通知到 Flink。
方案 3:利用 logminer 抽取账户表变更 Oracle 提供 logminer 来将数据库日志反解析成变更 SQL,这样就可以将 Oracle 账户表更新的信息抽取出来,通知 Flink 进行欺诈检测。这种方案的优点在于直接基于 Oracle 数据表的修改来做增量的同步(oracle 日志中记录账户表修改并提交了,说明客户修改账户是成功的,不用担心 Flink 通知了,账户表反而写失败了),降低了业务的耦合度,也不会担心丢失了账户变更事件;但是 logminer 每次只能挖掘一整个日志的变化,没法断点续传,并且挖掘的数据也只能写入 alert.log,会污染错误日志。这个方案缺陷也比较大。
上述三个方案都有一定的缺陷和问题,要么可能会漏掉部分变更数据,要么可能影响 oracle 性能。logminer 相对来说是避免漏数据,对数据库性能影响最小的方案,是否有一个类似于 logminer 而且支持断点续传,对 Flink 又比较友好的方案?
方案 4:利用 OGG 抽取账户表变更 Oracle 公司的 OGG 和国内部分厂商基本能避免 logminer 的缺点,但是需要在 Oracle 服务器上安装客户端,有侵入,并且配置和使用比较复杂,价格上也不是很友好。
最后我们找到了一个轻量、免费日志解析工具QDecoder来替代 OGG,将 oracle 账户表的变更通知到 Flink,从而实现欺诈检测的方法。官方介绍:QDecoder 是沃趣科技自主研发,基于 Oracle redo 日志进行二进制解析的订阅同步工具,易集成、零侵入、高性能、全免费。目前,QDecoder 已经在多家证券和银行上线使用,稳定运行,得到诸多客户的肯定与认可。
2.Oracle 源端增量输出
期待实现的实时欺诈检测的架构:
如图:
QDecoder 实时获取 Oracle 的 redo log,将账户表数据变更解析出来,写入 kafka 的指定 topic 中。
Flink 程序(FraudDetection)使用 flink-connector-kafka 从 kafka 获取交易数据,进行流式计算,识别出可能的欺诈交易,并输出警告。
2.1 安装 QDecoder
为了从 Oracle 的日志中挖掘出 account 表的变更数据,我们需要安装 QDecoder。QDecoder 的安装也非常简单,并且它跟其他的数据库同步软件不一样,它默认是不需要到 Oracle 服务器上去部署客户端的,只需要给定 ASM 账号就可以通过网络连接到 Oracle 上取日志。
一键安装命令如下: docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder
根据提示配置 QDecoder,更多信息可参考 https://hub.docker.com/r/woqutech/qdecoder
以下配置需要特别注意:
配置项 1.1 中列出的 sql,请以 dba 权限在 oracle 中执行,这将配置 QDecoder 查询系统表需要的权限。
配置项 2.1: 输入将要检测的表:qdecoder.account。简单起见,我们这里把 account 示例表新建在 qdecoder 账户下了。
conn qdecoder/qdecoder;create table account(accountid int primary key, balance number);
配置项 3.1: 选择输出到 kafka, bootstrap.servers 可以不输入,直接在容器中启动 kafka,qdecoder 会将变更数据写入“
defaultapp.qdecoder.binlog.qdecoder
”的 topic。生产环境下如果有 kafka 集群,请输入集群连接的地址和账号。
下面是配置示例:
2.2 更新账户表观察 QDecoder 的输出
等 QDecoder 成功启动后,可以按照提示运行 binlogdumpK,从 kafka 读取 binlog 并打印出来。由于 account 表并没有更新,此时没有变更数据输出。
我们来更新 account 表:
binlogdumpK 输出的 binlog 如下图:
上图中 schemaName: "qdecoder",tableName: "account", eventType: INSERT 表示这里是对qdecoder.account
表的 INSERT 操作。其实它对应的 sql 就是insert into qdecoder.account(accountid,balance) values(1,10000)
。
这里
Oracle 的日志解析出来是遵循阿里巴巴canal的 protobuf 格式,这个是关系型数据库增量输出的标准格式
binlogdumpK 只是为了观察一下 QDecoder 的输出,你可以随时关掉它,这并不影响 QDecoder 和 Flink 程序的运行。
至此,我们已经利用 QDecoder 从 Oracle 的日志解析出账户表的数据变更,那么,怎么将这些输出作为 Flink 现有的欺诈检测的输入源列呢?
3.实现 Kafka Consumer 对接 Oracle 源端增量数据
Flink示例程序是利用 addSource(new TransactionSource())来将静态数组作为源加入流处理的
我们要修改为从 kafka 中取日志,可以利用 DataStream Connectors 配置一个 Kafka Consumer。按照Flink自带的示例中的DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));
添加数据源。这里
"topic"应该修改为"
defaultapp.qdecoder.binlog.qdecoder
",这是 QDecoder 插入 kafka 的 topic 名称properties 修改为你连接的 kafka 的地址,我们这里使用的是 QDecoder 自带的 kafka,修改为 127.0.0.1:9092
SimpleStringSchema()
是直接以 string 的方式输出出来,QDecoder 的输出是 protobuf 的,不能直接用 string 的方式输出,而需要解析出来转换成欺诈检测认识的 Transaction
3.1 binlog 转换成 Transaction 实现
为了能完全重用 Flink 示例程序中的代码,我们这里需要
使用 com.alibaba.otter.canal.protocol 反序列化 QDecoder 插入到 kafka 的 binlog 日志
将 binlog 日志中的账户表变更转换成 Transaction 对象。
我们实现一个BinlogTransactionSchema类反序列化 binlog, 计算 balance 的变化,生成 org.apache.flink.walkthrough.common.entity.Transaction 对象。
主要代码如下:
3.2 将数据源加入 Flink
QDecoder 输出的增量数据转换后,可以直接作为数据源形成 DataStream
修改后代码如下:
3.3 验证运行
修改好的应用程序我们已经上传到 github 上供下载,测试运行步骤如下:
3.3.1 程序下载
3.3.2 程序运行
frauddetection 是一个 maven 创建的项目,有 pom.xml 项目文件,可以导入各种 IDE,进行调试和运行。这里只介绍大家常用的 intellij IDEA 运行验证方法
打开项目 开始界面:open or import -> 选择 frauddetection 目录 或者 菜单: file/open -> 选择 frauddetection 目录
运行程序 菜单: run -> run 'FraudDetectionJob'
注意:如果报告 slf4j 重复,且有大量的 log 输出,请在 module/dependencies 中删除 ch.qos.logback:logback-classic 和 ch.qos.logback:logback-core。
3.3.3 欺诈验证检测
QDecoder 和 frauddetection 正常运行以后,我们就可以更新 account.balance,模拟交易,来观察 frauddetection 程序是否能进行欺诈检测了。
在 Oracle 上执行以下 SQL
这里我们模拟了两个账号 account=1/2 的交易。其中 account=2 先入账 100 元,然后扣了 200 元是正常交易,不满足欺诈检测条件。account=1 的账户余额先扣了 0.1 元,然后再扣了 0.2 元,最后直接扣了 501 元,满足欺诈检测的条件:在一分钟内(private static final long ONE_MINUTE = 60 * 1000;)先做小于 1 元(private static final double SMALL_AMOUNT = 1.00)的小额交易,然后再做 500 以上的大额交易(private static final double LARGE_AMOUNT = 500.00;)。
检查欺诈交易是否被正确识别
执行完上述 SQL,frauddetection 程序会立即输出:
表示 accountid=1 的帐号检测到欺诈交易。
至此,我们的 frauddetection 欺诈检测程序就修改完成了。
回顾整个过程:
利用 QDecoder 实现了 Oracle 数据库的日志订阅增量数据导出
利用 alibaba canal 的 protobuf 来解析 Oracle 增量变化
实现了不到 100 行代码的 BinlogTransactionSchema 类,用于将解析数据转换为欺诈检测识别的数据
利用 Flink 的 DataStream Connectors 从 kafka 取出增量变化数据
利用了 Flink 实现增量流数据的有状态计算分布式处理,实现欺诈检测
4.总结
综上所述,我们利用 QDecoder 和 Flink 写了少于 100 行的代码,实现了一个简化版的银行信用卡欺诈检测程序。整体来说,利用 Flink+QDecoder 可以很容易将 oracle 的增量变化取出来,同步给大数据平台或者数据湖,有助于将静态数据流动起来,帮助企业盘活数据资产,提升运营决策效果。
版权声明: 本文为 InfoQ 作者【沃趣科技】的原创文章。
原文链接:【http://xie.infoq.cn/article/a449cf1348db6c160b6f8bc3f】。文章转载请联系作者。
评论