写点什么

代码回现 | 如何实现交易反欺诈?

用户头像
VoltDB
关注
发布于: 2021 年 04 月 14 日

一、背景概述

交易反欺诈是 VoltDB 适用场景之一,是典型的事件驱动的业务,核心是摄取高频的交易数据,并逐条对交易进行一系列复杂的反欺诈规则校验,最终生成评判交易可疑度的分值,发送给下游业务系统,触发交易拦截动作。反欺诈规则中涉及大量的通过分析历史交易生成的指标项,在 VoltDB 中进行流式计算,可基于本地保存的丰富的上下文数据对事件进行分析决策,使实时计算靠近上下文数据,获得性能优势。

二、实例回现

下面我们通过一个刷卡的应用,展示 VoltDB 是如何实现一个简单的反欺诈用例的。为了让示例代码更加简洁,又能突出 VoltDB 的功能,这里使用一个地铁刷卡的场景替代金融交易(如信用卡刷卡),以避免引入过多专业的金融业务知识。同时一个繁忙地铁系统产生的交易吞吐量不可小觑,定义的反欺诈规则也更容易理解。可以通过这个链接来访问详细的代码https://github.com/ssomagani/event-driven-transactions在这个应用中,模拟如下几个场景:


  1. 多辆列车在地铁站点之间运行,生成列车进站事件。通过这个场景可以了解,如何将数据发布到 VoltDB Topic 中,以及如何消费 Topic 中的数据。

  2. 公交卡充值操作。通过这个场景,可以了解,如何使用一个包含自定义业务规则的 procedure 来处理 Topic 中的数据,同时使用 Stream 对象将数据导出到 Topic 中,并通过视图对 Stream 中的数据流进行统计,生成实时的统计报表。视图会逐条统计 Stream 中的流数据,将处理结果保存到视图中,是 VoltDB 实现流式计算的方式之一。

  3. 乘客刷卡乘车,生成高频交易数据。通过这个场景,可以了解,如何使用 VoltDB 数据库客户端 api 直接操作数据表(区别与将数据发送到 Topic 中),保存交易数据。如何通过 VoltDB 的 java procedure 定制反欺诈校验规则,并调用 java procedure 进行交易校验和反欺诈行为。让我们来具体了解一下,在 VoltDB 中运行这个用例的过程。

2.1 准备工作

1. 启用 VoltDB Topic 功能 VoltDB 提供一个统一的配置文件,主要的特性都可以在其中进行定义,如:持久化、高可用、安全性等等,这里主要介绍与案例相关的 VoltDB Topic 功能。如下配置开启了 Topic 服务,并在服务器上开启端口 9999,用于接受客户端发来的消息。


  <Topics enabled="true">        <properties>            <property name="port">9999</property>            <property name="group.initial.rebalance.delay.ms">0</property>            <property name="retention.policy.threads">1</property>        </properties>        <profiles>            <profile name="retain_compact">                <retention policy="compact" limit="2048" />        </profile>        </profiles>    </Topics>
复制代码


2.根据特定配置文件启动 VoltDB3.创建 Topic,Topic 的用途后面的代码分析中提到


CREATE Topic TRAINTOPIC execute procedure train_events.insert;CREATE TOPIC RECHARGE execute procedure RechargeCard;CREATE TOPIC using stream CARD_ALERT_EXPORT properties(topic.format=avro);create topic using stream FRAUD properties(topic.format=avro,consumer.keys=TRANS_ID);
复制代码


4.创建数据表在处理实时事件流时,可以充分利用底层的数据库引擎,充分利用本地关系型数据进行数据分析,得到反欺诈业务指标。在本例中将创建如下数据表和视图(省略具体 DDL)



5.初始化数据通过 VoltDB 的数据导入功能,从 csv 文件中初始化站点和列车


csvloader --file $PROJ_HOME/data/redline.csv --reportdir log stationscsvloader --file $PROJ_HOME/data/trains.csv --reportdir log trains
复制代码

2.2 代码分析-列车运行

在这个场景中,客户端模拟 8 辆列车在 17 个站点之间运行,产生进站事件并发送到 Topic。由于设定的列车进出站时间比较短(微秒为单位),所以会产生高频事件流。在服务端,VoltDB 完成:1.消息接收 2.消费消息 3.将列车进站事件记录到数据库中在客户端,通过 java 类 TrainProducer 生成多辆列车进站事件,并将事件发送到 VoltDB Topic 中。TrainProducer 的执行命令如下:


java metro.pub.TrainProducer localhost:9999 TRAINTOPIC 8
复制代码


TrainProducer 类接收四个参数:


  1. .指定接收列车进站和离站事件的 VoltDB 服务器端口。这里假设在同一台机器上运行 client 代码和 VoltDB,而前面在 VoltDB 配置文件中我们已经指定 Topic 的监听端口是 9999。

  2. 指定 VoltDB broker

  3. 指定数据发送的 Topic 名称。

  4. 指定要模拟的列车数量。


分析一下 TrainProducer 的主要方法,main 方法生成 10 个线程,每 50 毫秒执行一次 publish()方法,将列车进出站时间发送到 Topic“TRAINTOPIC”中。


public static void main(String[] args) {        ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);        TrainProducer producer = new TrainProducer(args[0], args[1], Integer.parseInt(args[2]));        System.out.println("Scheduling trains");        EXECUTOR.scheduleAtFixedRate (                () -> {                    producer.publish(producer.getNewEvents());                }, 1, 50, MILLISECONDS);    }
复制代码


跟踪代码找到 producer 的定义,它其实就是原生的 KafkaProducer,所以可以看到 VoltDB Topic 完全兼容 kafka api。而 brokers 即是 main 方法中的传参 localhost:9999,因此上面 producer.getNewEvents()方法生成的数据将被发送到 VoltDB Topic 中。


private Producer<String, TrainEvent> createProducer() {        Properties props = new Properties();        props.put("bootstrap.servers", brokers);        props.put("acks", "all");        props.put("retries", 0);        props.put("batch.size", 16384);        props.put("linger.ms", 1);        props.put("buffer.memory", 33554432);        props.put("key.serializer",           "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "metro.serde.TrainEventSer"); Producer<String, TrainEvent> producer = new KafkaProducer <String, TrainEvent>(props); return producer; }
复制代码


Publish 方法所发送的消息由 producer.getNewEvents()方法生成。有必要提前看一下 Stations 类,其中定义了 17 个火车站点,包括每个站点的到下一个站点的运行时间(Station.nextStnDuration)和本站点停车时间(Station.stnWaitDuration),时间以微秒为单位。所有列车将依次在这些站点中运行。


 static HashMap<Integer, Station> idToStationMap = new HashMap<>();    static {        idToStationMap.put(1, new Station(1, 1200000, 450000));        idToStationMap.put(2, new Station(2, 1050000, 250000));        idToStationMap.put(3, new Station(3, 850000, 300000));        idToStationMap.put(4, new Station(4, 900000, 350000));        idToStationMap.put(5, new Station(5, 500000, 260000));        idToStationMap.put(6, new Station(6, 950000, 190000));        idToStationMap.put(7, new Station(7, 450000, 130000));        idToStationMap.put(8, new Station(8, 200000, 280000));        idToStationMap.put(9, new Station(9, 200000, 110000));        idToStationMap.put(10, new Station(10, 450000, 300000));        idToStationMap.put(11, new Station(11, 550000, 200000));        idToStationMap.put(12, new Station(12, 550000, 200000));        idToStationMap.put(13, new Station(13, 800000, 150000));        idToStationMap.put(14, new Station(14, 950000, 100000));        idToStationMap.put(15, new Station(15, 1000000, 130000));        idToStationMap.put(16, new Station(16, 1200000, 220000));        idToStationMap.put(17, new Station(17, 1500000, 500000));}   public static class Station {        public final int stationId;        public final int nextStnDuration;        public final int stnWaitDuration;        public Station(int stationId, int nextStnDuration, int stnWaitDuration) {            this.stationId = stationId;            this.nextStnDuration = nextStnDuration;            this.stnWaitDuration = stnWaitDuration;        }    }
复制代码


所以 getNewEvents 主要的逻辑是首先随机设定列车从任意站点出发,然后调用 next()根据系统当前时间和站点的 Station.nextStnDuration、Station.stnWaitDuration 来判断每辆列车目前运行到哪个站点,如果 next 返回的 LastKnownLocation 对象有变化,则判断列车已进入下一站,将列车进站事件 trainEvent 放到 records 中,用于发送给 Topic。(注:列车调度不是本样例的重点,因此 next 方法不会考虑列车的冲突问题,它假设站点之间由足够多的轨道,可以供多个列车并行)。


public List<TrainEvent> getNewEvents() {        ArrayList<TrainEvent> records = new ArrayList<>();        for(TrainEvent trainEvent : idToTrainMap.values()) {            LastKnownLocation prevLoc = trainEvent.location;            LastKnownLocation curLoc = next(prevLoc, LocalDateTime.now());            if(!prevLoc.equals(curLoc)) {                trainEvent = new TrainEvent(trainEvent.trainId, curLoc);                idToTrainMap.put(trainEvent.trainId, trainEvent);                records.add(trainEvent);            }        }        return records;    }
复制代码


Topic TRAINTOPIC 定义如下,train_events.insert 是 VoltDB 为表创建的默认存储过程,命名规则为[tablename].insert。Topic 与存储过程连用,表示存储过程 train_events.insert 消费该 Topic TRAINTOPIC 中的 trainEvent 数据,并写入 train_events 表中。


CREATE Topic TRAINTOPIC execute procedure train_events.insert;
复制代码

2.23 代码分析-公交卡充值

在这个场景中,客户端将完成充值消息发送。在服务端,VoltDB 完成:


  1. 消息接收

  2. 消费消息

  3. 使用自定义逻辑处理消息 将充值数据更新到数据库中

  4. 生成充值消息,并将数据写入 stream 对象中

  5. 基于 stream 对象创建视图,来生成实时的充值统计报表

  6. 将 stream 中的充值消息发布到 Topic 中,供后续(VoltDB 之外的)数据处理逻辑进行消费。例如被 spark 消费,由于进行后续的批处理逻辑。


在客户端通过执行 java 类 CardsProducer,首先初始化公交卡记录,并将记录写入数据库表中。然后随机生成卡片充值事件,发送事件到 Topic RECHARGE 中。CardsProducer 的执行命令如下:


java metro.pub.CardsProducer --mode=recharge --servers=localhost:9999 --Topic=RECHARGE
复制代码


CardsProducer 类接收三个参数:


  1. 执行模式,用于指定是初始化公交卡记录还是生成充值事件。

  2. 指定 VoltDB broker

  3. 指定数据发送的 Topic 名称分析一下 CardsProducer 的主要方法,main 方法生成 10 个线程,每 5 毫秒执行一次 publish()方法,将列车进出站时间发送到 Topic“RECHARGE”中。


    public static void main(String[] args) throws IOException {        CONFIG.parse("CardsProducer", args);        if(CONFIG.mode.equals("new")) {            genCards(CONFIG);            return;        }        ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);        CardsProducer producer = new CardsProducer(CONFIG.servers, CONFIG.Topic);        System.out.println("Recharging Cards");        EXECUTOR.scheduleAtFixedRate (                () -> {                    producer.publish(producer.getRechargeActivityRecords(1));                }, 1, 5, MILLISECONDS);    }
复制代码


和前面 TrainProducer 一样,CardsProducer 中的 producer 也是 KafkaProducer,不多介绍。getRechargeActivityRecords 方法用来生成一条随机的充值事件,包括卡号、充值金额和充值站点。每 5 毫秒执行一次。


  public List<CardEvent> getRechargeActivityRecords(int count) {        final ArrayList<CardEvent> records = new ArrayList<>();        int amt = (ThreadLocalRandom.current().nextInt(18)+2)*1000;        int stationId = ThreadLocalRandom.current().nextInt(1, 18);        ThreadLocalRandom.current().ints(count, 0, CONFIG.cardcount).forEach((cardId)                -> {                    records.add(new CardEvent(cardId, amt, stationId));                    }        );        return records;    }
复制代码


这个场景中,Client 端的代码非常简单,到此为止。更多的逻辑在服务端定义,请看以下。Topic 用于接收充值事件,它的定义如下:


CREATE TOPIC RECHARGE execute procedure RechargeCard;
复制代码


其中 RechargeCard 用于消费 Topic 中的数据,而 RechargeCard 是一个 java procedure,它通过 java+sql 的方式,自定义了业务逻辑。java procedure 是 VoltDB 在处理流数据时经常用到的对象,它是一个运行在 VoltDB 服务端的 java 类,而非 client 端代码。它需要提前编译成 jar 包(如下 procs.jar),并加载到 VoltDB java 运行时环境中。之后使用如下 DDL 定义。定义了 RechargeCard 后,在上面的 CREATE TOPIC 中才能被引用。


sqlcmd --query="load classes $PROJ_HOME/dist/procs.jar"CREATE PROCEDURE PARTITION ON TABLE cards COLUMN card_id PARAMETER 0 FROM CLASS metro.cards.RechargeCard;
复制代码


让我们看一下 RechargeCard 中的逻辑,重点关注如何将 java 业务逻辑与 SQL 进行结合。其中定义 run()方法和四个 sql 语句。RechargeCard 从 Topic RECHARGE 中消费数据,进行反序列化之后,逐条将数据(即充值事件)作为传参交给 run()方法,run()是 procedure 的入口方法。voltQueueSQL 是 VoltDB 的 server 端 api,用来执行 sql 并返回结果。Sql getCard 和 getStationName 首先根据从 Topic 中获取的数据进行充值事件合法性校验,如果数据库中没有对应的充值站点或公交卡记录,则执行 sql exportNotif 写入一条错误信息。否则,update VoltDB 数据库中对应公交卡,增加余额,并执行 sql exportNotif 写入一条成功信息。


public class RechargeCard extends VoltProcedure {    public final SQLStmt updateBalance = new SQLStmt("UPDATE cards SET balance = balance + ? WHERE card_id = ? AND card_type = 0");    public final SQLStmt getCard = new SQLStmt("SELECT * from cards WHERE card_id = ?");    public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");    public final SQLStmt getStationName = new SQLStmt("SELECT name FROM stations WHERE station_id = ?");     public long run(int cardId, int amt, int stationId) {        voltQueueSQL(getStationName, stationId);        voltQueueSQL(getCard, cardId);        String station = "UNKNOWN";                final VoltTable[] results = voltExecuteSQL();        if(results.length == 0)             exportError(cardId, station);                VoltTable stationResult = results[0];        if(stationResult.advanceRow())             station = stationResult.getString(0);                VoltTable card = results[1];        if(card.advanceRow()) {            voltQueueSQL(updateBalance, amt, cardId);                        String name = card.getString(5);            String phone = card.getString(6);            String email = card.getString(7);            int notify = (int) card.getLong(8);                        voltQueueSQL(updateBalance, amt, cardId);            voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, "Card recharged successfully");                        voltExecuteSQL(true);        } else {            exportError(cardId, station);        }        return 0;}    private void exportError(int cardId, String station) {        exportError(cardId, station, "", "", "", 0, "Could not locate details of card for recharge");    }        private void exportError(int cardId, String station, String name, String phone, String email, int notify, String msg) {        voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, msg);        voltExecuteSQL(true);    }}
复制代码


exportNotif 的定义如下,其中 CARD_ALERT_EXPORT 是 VoltDB 的 stream 数据库对象,一种数据管道,insert 进去的数据逐一流过。


public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
复制代码


可以在 CARD_ALERT_EXPORT 上添加数据处理逻辑,实现流计算效果。这个场景中,简单的在 Stream 上创建了一个视图,用于生成实时统计报表。视图的定义如下:


CREATE VIEW card_export_stats(card_id, station_name, rechargeCount) AS   SELECT card_id, station_name, count(*) from CARD_ALERT_EXPORT   GROUP BY card_id, station_name;
复制代码


最后,我们定义 Stream 中的数据最终流向另外的 Topic,该 Topic 可以让 VoltDB 之外的大数据产品进行消费,完成下游数据处理逻辑。


CREATE TOPIC using stream CARD_ALERT_EXPORT properties(Topic.format=avro);
复制代码

2.4 代码分析-乘客刷卡乘车

这个场景中,客户端随机生成大量乘客刷卡进站记录,并发送给数据库处理。服务端完成如下操作:1.首先进行一系列校验,如验证卡信息,卡余额,是否盗刷等反欺诈操作。2.将所有刷卡行为都记录到数据表中。并将余额不足和复合欺诈逻辑的刷卡事件分别发布到不同的 Topic 中,供其他下游系统订阅。在客户端通过执行 java 类 RidersProducer,与前面两个场景不同,RidersProducer 类直接连接 VoltDB 数据库将数据写入数据表中,而不是将数据发送到 VoltDB Topic 中。用来展示 VoltDB 的多种使用方式。connectToOneServerWithRetry 使用 VoltDB client api 连接指定 ip 的 VoltDB 数据库。


  void connectToOneServerWithRetry(String server, Client client) {        int sleep = 1000;        while (true) {            try {                client.createConnection(server);                break;            }            catch (Exception e) {                System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000);                try { Thread.sleep(sleep); } catch (Exception interruted) {}                if (sleep < 8000) sleep += sleep;            }        }        System.out.printf("Connected to VoltDB node at: %s.\n", server);    }
复制代码


RidersProducer 类创建 100 个线程,runBenchmark 方法中每 200 毫秒这些线程执行一次 getEntryActivityRecords。getEntryActivityRecords 随机生成一条乘客进站乘车记录,记录内容包括卡号、当前时间、进站站点 id 等


private static final ScheduledExecutorService EXECUTOR =     Executors.newScheduledThreadPool(100);public void runBenchmark() throws Exception {        int microsPerTrans = 1000000/RidersProducer.config.rate;        EXECUTOR.scheduleAtFixedRate (                () -> {                    List<Object[]> entryRecords = getEntryActivityRecords(config.cardcount);//生成随机的进站记录                    call(config.cardEntry, entryRecords);//将数据发送到VoltDB数据库                }, 10000, microsPerTrans, MICROSECONDS);    }    public static List<Object[]> getEntryActivityRecords(int count) {        final ArrayList<Object[]> records = new ArrayList<>();        long curTime = System.currentTimeMillis();        ThreadLocalRandom.current().ints(1, 0, count).forEach((cardId)                -> {                    records.add(new Object[] {cardId, curTime, Stations.getRandomStation().stationId, ENTER.value, 0});                    }        );        return records;    }
复制代码


接着调用 call 方法,将数据 records 发送到数据库进行处理。Call 方法定义如下,callProcedure 是 VoltDB 的 client 端 api,用于将数据发送给指定名称的 procedure 进行处理,可以通过同步和异步 IO 两种方式进行调用,异步调用时需要指定回调函数对数据库调用的返回结果进行处理,即本例中的自定义了 BenchmarkCallback。


   protected static void call(String proc, Object[] args) {        try {            client.callProcedure(new BenchmarkCallback(proc, args), procName, args);        } catch (IOException e) {            e.printStackTrace();        }    }
复制代码


Call 方法将数据发送给 procedure,procedure 名称由如下代码指定。一起看看 procedure 中的具体逻辑。


 @Option(desc = "Proc for card entry swipes")        String cardEntry = "ValidateEntry";
复制代码


Procedure ValidateEntry 的部分定义,首先定义了 6 个 SQL。


 //查询公交卡是否存在    public final SQLStmt checkCard = new SQLStmt(        "SELECT enabled, card_type, balance, expires, name, phone, email, notify FROM cards WHERE card_id = ?;");    //卡充值    public final SQLStmt chargeCard = new SQLStmt(        "UPDATE cards SET balance = ? WHERE card_id = ?;");    //查询指定站点的入站费用    public final SQLStmt checkStationFare = new SQLStmt(        "SELECT fare, name FROM stations WHERE station_id = ?;");    //记录进站事件    public final SQLStmt insertActivity = new SQLStmt(        "INSERT INTO card_events (card_id, date_time, station_id, activity_code, amount, accept) VALUES (?,?,?,?,?,?);");    //再次用到card_alert_export 这个stream对象,用于发送公交卡欠费消息    public final SQLStmt exportActivity = new SQLStmt(        "INSERT INTO card_alert_export (card_id, export_time, station_name, name, phone, email, notify, alert_message) VALUES (?,?,?,?,?,?,?,?);");    //将刷卡欺诈行为写入stream对象fraud中    public final SQLStmt publishFraud = new SQLStmt(            "INSERT INTO fraud (trans_id, card_id, date_time, station, activity_type, amt) values (?, ?, ?, ?, ?, ?)"            );
复制代码


值得说明的,上面最后一个 sql 中用到的 fraud 是另外一个 stream 对象,用于插入刷卡欺诈事件,通过 DDL 定义其中的刷卡欺诈行为最终会发布到 VoltDB Topic 中,用于下游处理产品消费。


CREATE STREAM FRAUD partition on column CARD_ID (  TRANS_ID varchar not null,  CARD_ID integer not null,  DATE_TIME timestamp not null,  STATION integer not null,  ACTIVITY_TYPE TINYINT not null,  AMT integer not null);create Topic using stream FRAUD properties(Topic.format=avro,consumer.keys=TRANS_ID);
复制代码


前面已经提到 run 方法是 procedure 的入口方法,VoltDB 运行 procedure 时,自动调用该方法。前面客户端传进的 records 记录,被逐一传递到 run 方法到参数中进行处理。run 方法定义如下


public VoltTable run(int cardId, long tsl, int stationId, byte activity_code, int amt) throws VoltAbortException {        //查询公交卡是否存在        voltQueueSQL(checkCard, EXPECT_ZERO_OR_ONE_ROW, cardId);        //查询指定站点的交通费用        voltQueueSQL(checkStationFare, EXPECT_ONE_ROW, stationId);        VoltTable[] checks = voltExecuteSQL();        VoltTable cardInfo = checks[0];        VoltTable stationInfo = checks[1];        byte accepted = 0;
//如果公交卡记录等于0,说明卡不存在 if (cardInfo.getRowCount() == 0) {
//记录刷卡行为到数据库表中,将accept字段置为拒绝“REJECTED” voltQueueSQL(insertActivity, cardId, tsl, stationId, ACTIVITY_ENTER, amt, ACTIVITY_REJECTED);voltExecuteSQL(true);//返回“被拒绝”消息给客户端。 return buildResult(accepted,"Card Invalid"); }
// 如果卡存在,则取出卡信息。 cardInfo.advanceRow(); //卡状态,0不可用,1可用 int enabled = (int)cardInfo.getLong(0); int cardType = (int)cardInfo.getLong(1); //卡余额 int balance = (int)cardInfo.getLong(2); TimestampType expires = cardInfo.getTimestampAsTimestamp(3); String owner = cardInfo.getString(4); String phone = cardInfo.getString(5); String email = cardInfo.getString(6); int notify = (int)cardInfo.getLong(7);
// 查询指定站点的进站费用 stationInfo.advanceRow(); //指定站点的进站费用 int fare = (int)stationInfo.getLong(0); String stationName = stationInfo.getString(1); // 刷卡时间 TimestampType ts = new TimestampType(tsl);
// 如果卡状态为不可用 if (enabled == 0) { //向客户端返回“此卡不可用” return buildResult(accepted,"Card Disabled"); }
// 如果卡类型为“非月卡” if (cardType == 0) { // 如果卡内余额充足 if (balance > fare) { //isFrand为反欺诈策略,后面介绍 if (isFraud(cardId, ts, stationId)) { // 如果认定为欺诈,记录刷卡记录,记录类型为“欺诈刷卡” voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_FRAUD); //并且把欺诈事件写入stream,并最终被发布到VoltDB Topic中。见前面STREAM FRAUD到ddl定义 voltQueueSQL(publishFraud, generateId(cardId, tsl), cardId, ts, stationId, ACTIVITY_ENTER, amt); voltExecuteSQL(true); //向客户端返回“欺诈交易”消息 return buildResult(0, "Fraudulent transaction"); } else { // 如果不是欺诈行为,则减少卡内余额,完成正常消费 voltQueueSQL(chargeCard, balance - fare, cardId); //记录正常的刷卡事件 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_ACCEPTED); voltExecuteSQL(true); //向客户端返回卡内余额 return buildResult(1, "Remaining Balance: " + intToCurrency(balance - fare)); } } else { // 如果卡内余额不足,记录刷卡失败事件。 voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, 0, ACTIVITY_REJECTED); if (notify != 0) { //再次用到card_alert_export 这个stream对象,用于发送公交卡欠费消息 voltQueueSQL(exportActivity, cardId, getTransactionTime().getTime(), stationName, owner, phone, email, notify, "Insufficient Balance"); } voltExecuteSQL(true); //向客户端返回“余额不足“消息 return buildResult(0,"Card has insufficient balance: "+intToCurrency(balance)); } } }
复制代码


以上代码中有一个 isFraud 方法,用于判定是否为欺诈性刷卡。这里定义了一些简单反欺诈规则


  1. 如果一秒钟内相同的卡片有 1 次以上的刷卡记录,认定为欺诈。因为不可能存在时间间隔如此短的刷卡行为,可能是由于有多张伪造卡片在同时刷卡。

  2. 同一张卡在过去一小时内,在 5 个或 5 个以上站点刷卡进站。假设这同样被认为是由于有多张伪造卡片在同时刷卡。

  3. 同一张卡在过去一小时内,有过 10 次以上刷卡进站记录。进出站次数太多,暂停使用一段时间。


isFraud方法根据当前刷卡记录中的数据,结合数据库中的历史记录实现以上反欺诈规则。历史刷卡记录被保存在card_events表中,另外基于这张表创建了视图,统计每张卡在一秒钟内是否有过刷卡记录。
复制代码


CREATE VIEW CARD_HISTORY_SECOND as select card_id, TRUNCATE(SECOND, date_time) scnd from card_events group by card_id, scnd;isFraud方法的定义    public final SQLStmt cardHistoryAtStations = new SQLStmt(        "SELECT activity_code, COUNT(DISTINCT station_id) AS stations " +        "FROM card_events " +        "WHERE card_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " +        "GROUP BY activity_code;"    );
public final SQLStmt cardEntries = new SQLStmt( "SELECT activity_code " + "FROM card_events " + "WHERE card_id = ? AND station_id = ? AND date_time >= DATEADD(HOUR, -1, ?) " + "ORDER BY date_time;" );
public final SQLStmt instantaneousCardActivity = new SQLStmt( "SELECT count(*) as activity_count " + "FROM CARD_HISTORY_SECOND " + "WHERE card_id = ? " + "AND scnd = TRUNCATE(SECOND, ?) " + "GROUP BY scnd;" ); public boolean isFraud(int cardId, TimestampType ts, int stationId) { voltQueueSQL(instantaneousCardActivity, cardId, ts); voltQueueSQL(cardHistoryAtStations, cardId, ts); voltQueueSQL(cardEntries, cardId, stationId, ts); final VoltTable[] results = voltExecuteSQL(); final VoltTable cardInstantaneousActivity = results[0]; final VoltTable cardHistoryAtStationisTable = results[1]; final VoltTable cardEntriesTable = results[2]; //一秒钟之内已经有一次刷卡记录的话,返回true while (cardInstantaneousActivity.advanceRow()) { if(cardInstantaneousActivity.getLong("activity_count") > 0) { return true; } } while (cardHistoryAtStationisTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code"); final long stations = cardHistoryAtStationisTable.getLong("stations");
if (activity_code == ACTIVITY_ENTER) { // 过去1小时之内在五个站点刷卡进站,返回true if (stations >= 5) { return true; } } }
byte prevActivity = ACTIVITY_INVALID; int entranceCount = 0; while (cardEntriesTable.advanceRow()) { final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code");
if (prevActivity == ACTIVITY_INVALID || prevActivity == activity_code) { if (activity_code == ACTIVITY_ENTER) { prevActivity = activity_code; entranceCount++; } else { prevActivity = ACTIVITY_INVALID; } } }
// 如果在过去1小时内有10次连续的刷卡记录,返回true。 if (entranceCount >= 10) { return true; }
return false; }
复制代码


您看好 VoltDB 吗? 马上行动吧!欢迎私信,与更多小伙伴一起探讨。


关于 VoltDBVoltDB 支持强 ACID 和实时智能决策的应用程序,以实现互联世界。没有其它数据库产品可以像 VoltDB 这样,可以同时需要低延时、大规模、高并发数和准确性相结合的应用程序加油。VoltDB 由 2014 年图灵奖获得者 Mike Stonebraker 博士创建,他对关系数据库进行了重新设计,以应对当今不断增长的实时操作和机器学习挑战。Stonebraker 博士对数据库技术研究已有 40 多年,在快速数据,流数据和内存数据库方面带来了众多创新理念。在 VoltDB 的研发过程中,他意识到了利用内存事务数据库技术挖掘流数据的全部潜力,不但可以满足处理数据的延迟和并发需求,还能提供实时分析和决策。VoltDB 是业界可信赖的名称,在诺基亚、金融时报、三菱电机、HPE、巴克莱、华为等领先组织合作有实际场景落地案例。

用户头像

VoltDB

关注

VoltDB以及数据库应用场景知识库 2020.11.02 加入

VOLTDB诞生作为支持云端部署的内存数据库,并在持续增强流计算能力,原生分布式架构提供了可伸缩性,同时完全满足ACID要求,数据安全可靠,是由2014图灵奖得主Mike Stonebraker博士领导全新设计的架构。

评论

发布
暂无评论
代码回现 | 如何实现交易反欺诈?