复杂事件处理简介

用户头像
星际行者
关注
发布于: 2020 年 08 月 02 日
复杂事件处理简介

复杂事件处理 (Complex Event Progressing,CEP) 是一种基于事件流的处理技术,它将系统数据看作不同类型的事件,通过分析事件间的关系,建立不同的事件关系序列库,利用过滤、关联、聚合、模式匹配等技术,最终由简单事件产生高级事件或商业流程。早在20世纪80年代,SQL的出现通过面向问题的方式取代了面向过程的查询数据方式,率先在数据库中广泛应用起来。90年代,Sybase率先提出触发器(Trigger)的理念,把数据的变更与事件联系起来,但触发器能够处理的数据量和复杂度有限,而且也没有时间序列的概念。2000年左右有厂商开始在这个方向上做一些基于事件和数据流的处理,并且借鉴SQL,希望通过面向问题的方式进行处理,现在已经形成了一个特殊的领域,也就是复杂事件处理(CEP)。



目前CEP系统有很多,功能也各不相同,常见的CEP系统有Esper、Shiddi、Flink、Oracle Event Processing等等,更详细的信息可参见wikipedia(Complex event processing - Wikipedia)。本文将从什么是CEP、CEP与流式计算、CEP分布式实现等几个方面简单介绍CEP。



一、什么是复杂事件处理

为了直观理解CEP,我们先来看Shiddi所提供的一个官方例子。

public class SimpleFilterSample {
public static void main(String[] args) throws InterruptedException {
// Creating Siddhi Manager
SiddhiManager siddhiManager = new SiddhiManager();
String siddhiApp = "" +
"define stream cseEventStream (symbol string, price float, volume long); " +
"" +
"@info(name = 'query1') " +
"from cseEventStream[volume < 150] " +
"select symbol,price " +
"insert into outputStream ;";
// Generating runtime
SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
// Adding callback to retrieve output events from query
siddhiAppRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
// EventPrinter.print(timeStamp, inEvents, removeEvents);
System.out.print(inEvents[0].getData(0) + " ");
}
});
// Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = siddhiAppRuntime.getInputHandler("cseEventStream");
// Starting event processing
siddhiAppRuntime.start();
// Sending events to Siddhi
inputHandler.send(new Object[]{"Welcome", 700f, 100L});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200L});
inputHandler.send(new Object[]{"to", 50f, 30L});
inputHandler.send(new Object[]{"IBM", 76.6f, 400L});
inputHandler.send(new Object[]{"siddhi!", 45.6f, 50L});
Thread.sleep(500);
// Shutting down the runtime
siddhiAppRuntime.shutdown();
// Shutting down Siddhi
siddhiManager.shutdown();
}
}



基于上面的例子,很容易对CEP有个直观的理解,首先定义一个事件流cseEventStream,通过程序不断向这个流中插入事件,同时通过对流中事件的实时处理,过滤调volume < 150的事件,并且插入到outputStream中,通过关联的回调函数对outputStream流中的事件在做进一步处理,整个处理过程完全使用类SQL面向问题方式,不需要针对性的开发过程式处理程序。



尽管CEP有多种不同实现,但总体上讲CEP的特点是通过类SQL、DSL等方式完成对业务逻辑处理的描述,而非开发过程式处理程序。下面再来看个例子:如果一个房间温度在10分钟之内增长超过5度,发送告警。

from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;




对于不熟悉CEP的人来讲,这段代码初看起来有点复杂,我们可以简单的这么理解一下,首先有一个叫做TempStream的流存放温度事件,对于每一个进入TempStream流的事件都定义一个10分钟的事件窗口(within 10 min),我们把第一个事件定义为e1,窗口期内e1之后的其他事件我们定义为e2,[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]表示e1,e2是同一房间产生的事件并且温度增长超过5度,当满足这个条件时,将e1,e2作为一条记录插入到AlertStream流中。



有别与一般的流式处理框架,every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ] 这种独特的表现形式在CEP中被称为模式(Pattern)。



二、CEP vs 流式计算

伴随着Storm、Spark、Flink等流式计算框架的出现,CEP的热度也逐渐上升起来,尽管CEP的出现要早于Storm、Spark、Flink很久。CEP与流式计算有很多相似之处,但两者最大的区别就是CEP是面向问题的,而Storm等流式计算框架是面向过程的,针对具体的问题需要开发大量应用程序。可以认为Storm等流式计算框架可以做为CEP的底层实现,事实上像WSO2这样的产品也就是这样实现的,也就是说当用户定义CEP Query后,WSO2进行Query解析,分解成若干算子后交由Storm集群去处理。



三、分布式CEP

前文中,我们展示了一个单实例的CEP程序并且提及流计算框架可以做为CEP的基础,通常像Storm这类流计算框架是分布式、无状态的,而CEP通常是有状态的,这里我们将进一步讨论CEP遇到分布式、无状态的流计算会遇到哪些问题以及如何处理:



  1. 无状态处理,Filter这类算子是典型的无状态处理,只对输入流中的数据进行过滤输出到其他流中。

  2. 有状态处理,Window、Sequence、Pattern这类算子,需要对事件集合进行处理的就涉及到状态存储问题。

  3. 分区,为了提高处理性能,通常分布式环境下采用多任务进行处理,流中的事件交由哪个任务处理也是一个问题。



前文中我们提到WSO2基于Storm,接下来以“多种类型的厨房设备将自身设备状态发送给CEP,检测到异常后发送告警通知。”的例子来介绍WSO2如何与Storm结合。





Step1,首先定义一个通过Http形式接受事件的流(DevicePowerStream),DevicePowerStream的并行度是3,可以理解为DevicePowerStream对应一个Strom Bolt,Bolt的并行度是3。

@source(type = ‘http’, receiver.url=‘ ‘, topic = ‘device-power’, @dist(parallel =‘3’) @map(type = ‘json’))
define stream DevicePowerStream (type string, deviceID string, power int);



Step2, 当DevicePowerStream收到数据后,业务上只关注type=‘monitored’的设备数据,所以DevicePowerStream流用于过滤type=‘monitored’的数据。

@info(name = ‘monitored-filter’) @dist(execGroup=‘group1’, parallel =‘3’)
from DevicePowerStream[type == ‘monitored’]
select deviceID, power
insert current events into MonitoredDevicesPowerStream;



Step3, 根据deviceID对MonitoredDevicesPowerStream中的数据做分组,同时定义一个2min的时间窗口,并计算窗口期内power的平均值,写入到AvgPowerStream中,对于AvgPowerStream流的处理则采用了前面提到的CEP Pattern技术,10分钟内出现两次事件值的增幅大于5则认为是异常事件。

@info(name = ‘power-increase-pattern’) @dist(execGroup=‘group2’, parallel =‘3’)
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = ‘avg-calculator’)
from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
insert current events into #AvgPowerStream;

@info(name = ‘power-increase-detector’)
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
insert current events into RisingPowerStream;
end;



Step4,而后对异常事件在进行后续过滤处理、发送告警,这里不在详细解释。



从上面的图中可以看到,流之间的数据传递使用了Kafka而非Storm Bolt之间数据的简单传递。这里应用Kafka做为持久化存储主要解决以下问题:

  1. 流之间消费速度不同所产生的背压问题。

  2. 通过外部存储,实现流处理任务无本地状态,可以扩展并行度。

  3. 存储流窗口期数据。



完整示例:



@source(type = 'http', receiver.url=' ', topic = 'device-power', @dist(parallel ='3') @map(type = 'json'))
define stream DevicePowerStream (type string, deviceID string, power int);


@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}',
@map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);


@Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/sp", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver",field.length="symbol:100")
define table DeviceIdInfoTable (deviceID string, roomID string, autorityContactEmail string);


@info(name = 'monitored-filter') @dist(execGroup='group1', parallel ='3')
from DevicePowerStream[type == 'monitored']
select deviceID, power
insert current events into MonitoredDevicesPowerStream;


@info(name = 'power-increase-pattern') @dist(execGroup='group2', parallel ='3')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower
insert current events into #AvgPowerStream;
@info(name = 'power-increase-detector')
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower
insert current events into RisingPowerStream;
end;


@info(name = 'power-range-filter') @dist(execGroup='group3', parallel ='1')
from RisingPowerStream[finalPower > 100]
select deviceID, initialPower, finalPower
insert current events into DevicesWithinRangeStream;


@info(name = 'enrich-alert')
@dist(execGroup='group3' ,parallel ='1')
from DevicesWithinRangeStream as s join DeviceIdInfoTable as t
on s.deviceID == t.deviceID
select s.deviceID as deviceID, t.roomID as roomID, s.initialPower as initialPower, s.finalPower as finalPower, t.autorityContactEmail as autorityContactEmail insert current events into AlertStream;




四、总结

值得一提的是,随着流计算的兴起,出现了很多基于流计算开发的类SQL系统,简化流计算开发,比如Kafka KSQL。相比于CEP,这类系统普遍实现了Window算子,但却缺少Sequence、Pattern等高级算子,或者不久之后也会实现。



CEP在IOT、量化交易、风控等多个领域有着广泛的应用,相信以后应用会越来越广,拭目以待吧。



发布于: 2020 年 08 月 02 日 阅读数: 184
用户头像

星际行者

关注

编程多年依旧热爱。。。 2019.03.28 加入

还未添加个人简介

评论

发布
暂无评论
复杂事件处理简介