写点什么

对信用卡欺诈 Say No!百行代码实现简化版实时欺诈检测

作者:沃趣科技
  • 2022 年 3 月 01 日
  • 本文字数:5471 字

    阅读完需:约 18 分钟

对信用卡欺诈 Say No!百行代码实现简化版实时欺诈检测

进入互联网时代,你的绝大部分操作都可以在网上进行,极大的方便了我们的生活。但是信用卡盗刷者也可以利用网络来诈骗,典型的做法是:诈骗者首先入侵安全级别较低系统来盗窃信用卡卡号,用盗得的信用卡进行很小额度的消费进行测试,如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买那些可以倒卖的财物,实现诈骗敛财的目标。


大部分银行都有针对信用卡诈骗的反欺诈检测系统,通过对诈骗模式进行识别,及时通知用户或者直接冻结账户,来避免进一步损失。flink的入门介绍文档中就展示一种信用卡诈骗检测的实现方式,但是数据来源是一个静态数组,不符合实际用户场景。本文将介绍一种方案,通过少于 100 行代码修改该实例程序,实现基于 Oracle 上账户表变更的实时欺诈检测。


1.传统实时欺诈检测方案分析

Flink示例程序会检测每一笔交易,若发现一个帐户在 1 分钟内,先出现了一笔小交易(小于 1),后面又出现了一笔大交易(大于 500),则认为出现了欺诈交易,立即输出警告。具体的代码解析可以阅读基于DataStream API 实现欺诈检测


但是,示例程序中的数据来源 TransactionSource 的数据来源是一个静态数组 private static List<Transaction> data = Arrays.asList(new Transaction(1L, 0L, 188.23D)...的迭代器。一般情况下,客户的余额是存储在 Oracle 的账户表中的。怎么将客户余额的变化输出到 Flink 中,来实现实时的欺诈检测列?能想到的方案列举如下:


  • 方案 1:轮询从 Oracle 账户表查询余额变更 应用程序固定时间间隔去轮询 Oracle 账户表的数据,检查到某个客户的账户余额发生了变化后,通知 Flink 进行欺诈检测。这种方案需要不断轮询 Oracle 数据库,对有数据库性能影响,并且就算轮询的间隔足够短,还是有可能漏掉了一些账户变更信息,不可取。

  • 方案 2:业务代码修改 Oracle 账户表时通知 Flink 修改交易程序,在它去更新 Oracle 账户表时,通知 Flink 进行欺诈检测。这种方案的优势在于不会丢掉任何账户变更的事件;但是需要修改交易程序,会导致业务程序耦合度提升。实现上如果采用同步模式,可能会由于 Flink 失败而导致交易失败,也会大大提高交易持续时间;而采用异步方式,需要考虑通知 Flink 和写入账户表的原子性,有可能成功通知了 Flink 但是写入账户表失败了,也有可能写入账户表成功了,却没有通知到 Flink。

  • 方案 3:利用 logminer 抽取账户表变更 Oracle 提供 logminer 来将数据库日志反解析成变更 SQL,这样就可以将 Oracle 账户表更新的信息抽取出来,通知 Flink 进行欺诈检测。这种方案的优点在于直接基于 Oracle 数据表的修改来做增量的同步(oracle 日志中记录账户表修改并提交了,说明客户修改账户是成功的,不用担心 Flink 通知了,账户表反而写失败了),降低了业务的耦合度,也不会担心丢失了账户变更事件;但是 logminer 每次只能挖掘一整个日志的变化,没法断点续传,并且挖掘的数据也只能写入 alert.log,会污染错误日志。这个方案缺陷也比较大。


上述三个方案都有一定的缺陷和问题,要么可能会漏掉部分变更数据,要么可能影响 oracle 性能。logminer 相对来说是避免漏数据,对数据库性能影响最小的方案,是否有一个类似于 logminer 而且支持断点续传,对 Flink 又比较友好的方案?


  • 方案 4:利用 OGG 抽取账户表变更 Oracle 公司的 OGG 和国内部分厂商基本能避免 logminer 的缺点,但是需要在 Oracle 服务器上安装客户端,有侵入,并且配置和使用比较复杂,价格上也不是很友好。


最后我们找到了一个轻量、免费日志解析工具QDecoder来替代 OGG,将 oracle 账户表的变更通知到 Flink,从而实现欺诈检测的方法。官方介绍:QDecoder 是沃趣科技自主研发,基于 Oracle redo 日志进行二进制解析的订阅同步工具,易集成、零侵入、高性能、全免费。目前,QDecoder 已经在多家证券和银行上线使用,稳定运行,得到诸多客户的肯定与认可。


2.Oracle 源端增量输出

期待实现的实时欺诈检测的架构:

如图:

  • QDecoder 实时获取 Oracle 的 redo log,将账户表数据变更解析出来,写入 kafka 的指定 topic 中。

  • Flink 程序(FraudDetection)使用 flink-connector-kafka 从 kafka 获取交易数据,进行流式计算,识别出可能的欺诈交易,并输出警告。


2.1 安装 QDecoder

为了从 Oracle 的日志中挖掘出 account 表的变更数据,我们需要安装 QDecoder。QDecoder 的安装也非常简单,并且它跟其他的数据库同步软件不一样,它默认是不需要到 Oracle 服务器上去部署客户端的,只需要给定 ASM 账号就可以通过网络连接到 Oracle 上取日志。

 

一键安装命令如下: docker run -it --name=qdecoder -p 9191:9191 -p 9092:9092 --pull always registry.cn-hangzhou.aliyuncs.com/woqutech/qdecoder


根据提示配置 QDecoder,更多信息可参考 https://hub.docker.com/r/woqutech/qdecoder


以下配置需要特别注意:


  • 配置项 1.1 中列出的 sql,请以 dba 权限在 oracle 中执行,这将配置 QDecoder 查询系统表需要的权限。

  • 配置项 2.1: 输入将要检测的表:qdecoder.account。简单起见,我们这里把 account 示例表新建在 qdecoder 账户下了。conn qdecoder/qdecoder;create table account(accountid int primary key, balance number);

  • 配置项 3.1: 选择输出到 kafka, bootstrap.servers 可以不输入,直接在容器中启动 kafka,qdecoder 会将变更数据写入“defaultapp.qdecoder.binlog.qdecoder”的 topic。生产环境下如果有 kafka 集群,请输入集群连接的地址和账号。


下面是配置示例: 



2.2 更新账户表观察 QDecoder 的输出

等 QDecoder 成功启动后,可以按照提示运行 binlogdumpK,从 kafka 读取 binlog 并打印出来。由于 account 表并没有更新,此时没有变更数据输出。


我们来更新 account 表:

insert into account values(1,10000);insert into account values(2,20000);insert into account values(3,30000);commit;
复制代码


binlogdumpK 输出的 binlog 如下图:

上图中 schemaName: "qdecoder",tableName: "account", eventType: INSERT 表示这里是对qdecoder.account表的 INSERT 操作。其实它对应的 sql 就是insert into qdecoder.account(accountid,balance) values(1,10000)


这里

  • Oracle 的日志解析出来是遵循阿里巴巴canal的 protobuf 格式,这个是关系型数据库增量输出的标准格式

  • binlogdumpK 只是为了观察一下 QDecoder 的输出,你可以随时关掉它,这并不影响 QDecoder 和 Flink 程序的运行。

至此,我们已经利用 QDecoder 从 Oracle 的日志解析出账户表的数据变更,那么,怎么将这些输出作为 Flink 现有的欺诈检测的输入源列呢?


3.实现 Kafka Consumer 对接 Oracle 源端增量数据

Flink示例程序是利用 addSource(new TransactionSource())来将静态数组作为源加入流处理的

DataStream<Transaction> transactions = env            .addSource(new TransactionSource())            .name("transactions");
复制代码


我们要修改为从 kafka 中取日志,可以利用 DataStream Connectors 配置一个 Kafka Consumer。按照Flink自带的示例中的DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));添加数据源。这里

  • "topic"应该修改为"defaultapp.qdecoder.binlog.qdecoder",这是 QDecoder 插入 kafka 的 topic 名称

  • properties 修改为你连接的 kafka 的地址,我们这里使用的是 QDecoder 自带的 kafka,修改为 127.0.0.1:9092

  • SimpleStringSchema()是直接以 string 的方式输出出来,QDecoder 的输出是 protobuf 的,不能直接用 string 的方式输出,而需要解析出来转换成欺诈检测认识的 Transaction


3.1 binlog 转换成 Transaction 实现

为了能完全重用 Flink 示例程序中的代码,我们这里需要

  • 使用 com.alibaba.otter.canal.protocol 反序列化 QDecoder 插入到 kafka 的 binlog 日志

  • 将 binlog 日志中的账户表变更转换成 Transaction 对象。


我们实现一个BinlogTransactionSchema类反序列化 binlog, 计算 balance 的变化,生成 org.apache.flink.walkthrough.common.entity.Transaction 对象。


主要代码如下:

// 反序列化EntryCanalEntry.Entry entry = CanalEntry.Entry.parseFrom(binlog);// 获取表名entry.getHeader().getTableName();// 获取Entry type:ROWDATA|TRANSACTIONBEGIN|TRANSACTIONEND|...entry.getEntryType();// 获取row changeCanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());// 获取Event type: INSERT|UPDATE|DELTE|...rowChange.getEventType();// 获取执行时间long executeTimeMs = entry.getHeader().getExecuteTime();// 获取row dataCanalEntry.RowData rowData = rowChange.getRowDatas(0);
// 获取旧值for (CanalEntry.Column col : rowData.getBeforeColumnsList()) {    if (col.getName().equalsIgnoreCase("accountid")) {        oldRow.accountId = Long.parseLong(col.getValue());     } else if (col.getName().equalsIgnoreCase("balance")) {        oldRow.balance = Double.parseDouble(col.getValue());    }}
// 获取新值for (CanalEntry.Column col : rowData.getAfterColumnsList()) {    if (col.getName().equalsIgnoreCase("accountid")) {        newRow.accountId = Long.parseLong(col.getValue())    } else if (col.getName().equalsIgnoreCase("balance")) {        newRow.balance = Double.parseDouble(col.getValue());    }}
// 创建transactionnew Transaction(newRow.accountId, executeTimeMs, Math.abs(newRow.balance-oldRow.balance));
复制代码

3.2 将数据源加入 Flink

QDecoder 输出的增量数据转换后,可以直接作为数据源形成 DataStream

修改后代码如下:

public static void main(String[] args)throws Exception{StreamExecutionEnvironment env=StramExecutionEnvironment.getExecutionEnvironment();
// 创建FlinkKafkaConsumer,用BinlogTransactionSchema反序列化Properties properties = new Properties();properties.setProperty("bootstrap.servers", "127.0.0.1:9092");properties.setProperty("group.id", "flink.test");
// 指定"defaultapp.qdecoder.binlog.qdecoder"的topic,使用BinlogTransactionSchema来将QDecoder账户表的更新转换成TransactionFlinkKafkaConsumer<Transaction> kafkaSource = new FlinkKafkaConsumer<Transaction>("defaultapp.qdecoder.binlog.qdecoder", new BinlogTransactionSchema(), properties);kafkaSource.setStartFromEarliest();
DataStream<Transaction> transactions = env .addSource(kafkaSource) .name("transactions");}
复制代码


3.3 验证运行

修改好的应用程序我们已经上传到 github 上供下载,测试运行步骤如下:

3.3.1 程序下载
git clone https://github.com/woqutech/qdecoder.gitcd qdecoder/FlinkSample/frauddetection
复制代码


3.3.2 程序运行

frauddetection 是一个 maven 创建的项目,有 pom.xml 项目文件,可以导入各种 IDE,进行调试和运行。这里只介绍大家常用的 intellij IDEA 运行验证方法

  • 打开项目 开始界面:open or import -> 选择 frauddetection 目录 或者 菜单: file/open -> 选择 frauddetection 目录

  • 运行程序 菜单: run -> run 'FraudDetectionJob'


注意:如果报告 slf4j 重复,且有大量的 log 输出,请在 module/dependencies 中删除 ch.qos.logback:logback-classic 和 ch.qos.logback:logback-core。

3.3.3 欺诈验证检测

QDecoder 和 frauddetection 正常运行以后,我们就可以更新 account.balance,模拟交易,来观察 frauddetection 程序是否能进行欺诈检测了。


  • 在 Oracle 上执行以下 SQL

update account set balance = balance - 0.1 where accountid = 1;commit;update account set balance = balance - 0.2 where accountid = 1;commit;update account set balance = balance + 100 where accountid = 2;commit;update account set balance = balance - 501 where accountid = 1;commit;update account set balance = balance - 200 where accountid = 2;commit;
复制代码


这里我们模拟了两个账号 account=1/2 的交易。其中 account=2 先入账 100 元,然后扣了 200 元是正常交易,不满足欺诈检测条件。account=1 的账户余额先扣了 0.1 元,然后再扣了 0.2 元,最后直接扣了 501 元,满足欺诈检测的条件:在一分钟内(private static final long ONE_MINUTE = 60 * 1000;)先做小于 1 元(private static final double SMALL_AMOUNT = 1.00)的小额交易,然后再做 500 以上的大额交易(private static final double LARGE_AMOUNT = 500.00;)。


  • 检查欺诈交易是否被正确识别

执行完上述 SQL,frauddetection 程序会立即输出:

21:11:20,107 INFO  org.apache.flink.walkthrough.common.sink.AlertSink           [] - Alert{id=1}
复制代码

表示 accountid=1 的帐号检测到欺诈交易。

至此,我们的 frauddetection 欺诈检测程序就修改完成了。


回顾整个过程:

  • 利用 QDecoder 实现了 Oracle 数据库的日志订阅增量数据导出

  • 利用 alibaba canal 的 protobuf 来解析 Oracle 增量变化

  • 实现了不到 100 行代码的 BinlogTransactionSchema 类,用于将解析数据转换为欺诈检测识别的数据

  • 利用 Flink 的 DataStream Connectors 从 kafka 取出增量变化数据

  • 利用了 Flink 实现增量流数据的有状态计算分布式处理,实现欺诈检测


4.总结

综上所述,我们利用 QDecoder 和 Flink 写了少于 100 行的代码,实现了一个简化版的银行信用卡欺诈检测程序。整体来说,利用 Flink+QDecoder 可以很容易将 oracle 的增量变化取出来,同步给大数据平台或者数据湖,有助于将静态数据流动起来,帮助企业盘活数据资产,提升运营决策效果。

发布于: 19 小时前阅读数: 1014
用户头像

沃趣科技

关注

玩转数据库生态的技术迷 2022.02.23 加入

还未添加个人简介

评论

发布
暂无评论
对信用卡欺诈 Say No!百行代码实现简化版实时欺诈检测_数据库表_沃趣科技_InfoQ写作平台