写点什么

Presto 设计与实现(七):Event

作者:冰心的小屋
  • 2023-08-23
    北京
  • 本文字数:5378 字

    阅读完需:约 18 分钟

Presto 设计与实现(七):Event

之前在 TalkingData 参与过移动广告监测产品的设计和开发,每天需要处理大量的广告点击、曝光、下载、注册和登录等数据,对于每条数据都需要进行如下处理:

  • 处理前后记录日志;

  • 使用 Redis 累加每种类型每天的数据条数;

  • 处理时发生异常,将该条数据上报报警中心,报警中心根据异常发生的频率决定是否报警;

  • 如果是点击数据,需要提取点击数据中携带的关键标识写入 Cassandra,用于后续的归因分析。


编码实现时尽量减少前后依赖,让代码变得简单,易于维护。这里推荐使用观察者模式来解耦,每个分发逻辑只监听关心的事件。当然我们不是从 0 开始,可以从 Airlift Event 开始。

1. Airlift Event

借助于 Airlift Event 我们可以这样设计:

  • EventService:用于广告数据的处理,处理过程中会产生事件,之后推送给 MultiEventClient;

  • MultiEventClient: 相当于观察者模式中的 Subject,如有新事件会通知 Observer;

  • EventClient 具体的实现类: 相当于观察者模式中的 Observer,先注册到 Subject,之后监听新事件;

1.1 定义广告数据和处理服务

  • AdvertData:模拟广告数据;

  • AdvertDataService:处理广告数据接口;

  • AdvertDataServiceImpl:处理广告数据接口的实现类。

// 1. 广告数据@Getter@AllArgsConstructor@ToStringpublic class AdvertData {    private Type type;
public enum Type { CLICK, IMPRESSION, INSTALL, REGISTER }}
// 2. 处理广告数据的服务public interface AdvertDataService { void process(AdvertData advertData);}
// 3. 上面接口的实现类import com.google.inject.Inject;import io.airlift.event.client.MultiEventClient;
import java.util.concurrent.atomic.AtomicInteger;
public class AdvertDataServiceImpl implements AdvertDataService { private MultiEventClient multiEventClient;
@Inject public AdvertDataServiceImpl(MultiEventClient multiEventClient) { this.multiEventClient = multiEventClient; }
// 用来模拟处理失败事件 private AtomicInteger count = new AtomicInteger();
@Override public void process(AdvertData advertData) { // 1. 模拟正在处理事件 multiEventClient.post(new AdvertEvent(AdvertEventType.PROCESSING, advertData));
// 2. 模拟失败事件 int value = count.getAndIncrement(); if ((value % 2) == 0) { try { throw new IllegalArgumentException("AdvertData's id is invalid"); } catch (IllegalArgumentException e) { AdvertEvent errorEvent = new AdvertEvent(AdvertEventType.ERROR, advertData); errorEvent.put(AdvertEvent.ERROR_KEY, e.getMessage()); multiEventClient.post(errorEvent); return; } }
// 3. 模式处理完毕事件 multiEventClient.post(new AdvertEvent(AdvertEventType.PROCESSED, advertData)); }}
复制代码

1.2 定义 Event 事件

  • AdvertEventType:事件类型,PROCESSING=处理中,ERROR=异常,PROCESSED=处理完毕;

  • AdvertEvent:事件,事件内部有事件类型、广告原始数据和扩展属性,如果使用 JSON 序列化,请使用标识事件类的注解 @EventType ,标识事件属性的注解 @EventField。

// 1. Event 事件的类型public enum AdvertEventType {    // 处理中    PROCESSING,    // 处理异常    ERROR,    // 处理完毕    PROCESSED}
// 2. Event 事件import io.airlift.event.client.EventField;import io.airlift.event.client.EventType;
import java.util.HashMap;import java.util.Map;
@EventType("advert")public class AdvertEvent { public static final String ERROR_KEY = "error";
private AdvertEventType type; private AdvertData source; private Map<String, Object> properties = new HashMap<>();
public AdvertEvent(AdvertEventType type, AdvertData source) { this.type = type; this.source = source; }
@EventField public AdvertEventType getType() { return type; }
@EventField public AdvertData getSource() { return source; }
@EventField public Map<String, Object> getProperties() { return properties; }
public AdvertEvent put(String key, Object value) { properties.put(key, value); return this; }
public <T> T getValue(String key) { return (T) properties.get(key); }}
复制代码

1.3 定义具体的 EventClient

可以继承 Airlift 提供的 AbstractEventClient 类,需要实现一个范型方法,我这里又简单封装一下:

import com.ice.event.AdvertEvent;import io.airlift.event.client.AbstractEventClient;
import java.io.IOException;
// 1. 子类需要继承 AbstractAdvertEventClient, 处理更规范一些public abstract class AbstractAdvertEventClient extends AbstractEventClient { @Override protected <T> void postEvent(T event) throws IOException { // 1. 校验是否为 AdvertEvent if (!(event instanceof AdvertEvent)) { return; }
AdvertEvent advertEvent = (AdvertEvent) event; // 2. 子类是否支持该类型事件 if (!support(advertEvent)) { return; }
// 3. 校验通过实际处理 postEvent(advertEvent); }
protected boolean support(AdvertEvent advertEvent) { return true; }
protected abstract void postEvent(AdvertEvent advertEvent) throws IOException;}
// 2. 日志 Clientpublic class LogEventClient extends AbstractAdvertEventClient { @Override protected void postEvent(AdvertEvent advertEvent) throws IOException { AdvertEventType type = advertEvent.getType(); if (type == AdvertEventType.PROCESSING) { System.out.println("[LOG]Precessing message: data=" + advertEvent.getSource()); } else if (type == AdvertEventType.PROCESSED) { System.out.println("[LOG]Processed message: data=" + advertEvent.getSource()); } else if (type == AdvertEventType.ERROR) { System.out.printf("[LOG]Error message: data=%s, error=%s \n", advertEvent.getSource(), advertEvent.getValue(AdvertEvent.ERROR_KEY)); } }}
// 3. Redis Clientpublic class RedisEventClient extends AbstractAdvertEventClient { @Override protected boolean support(AdvertEvent advertEvent) { return advertEvent.getType() == AdvertEventType.PROCESSING; }
@Override protected void postEvent(AdvertEvent advertEvent) throws IOException { System.out.println("[Redis]Increased event count: " + advertEvent.getSource()); }}
// 4. 点击 Clientpublic class ClickEventClient extends AbstractAdvertEventClient { @Override protected boolean support(AdvertEvent advertEvent) { boolean isProcessing = advertEvent.getType() == AdvertEventType.PROCESSING; if (!isProcessing) { return false; }
return advertEvent.getSource().getType() == AdvertData.Type.CLICK; }
@Override protected void postEvent(AdvertEvent advertEvent) throws IOException { System.out.println("[CLICK]Inserted cassandra: " + advertEvent.getSource()); }}
// 5. 报警 Clientpublic class AlarmEventClient extends AbstractAdvertEventClient { @Override protected boolean support(AdvertEvent advertEvent) { return advertEvent.getType() == AdvertEventType.ERROR; }
@Override protected void postEvent(AdvertEvent advertEvent) throws IOException { System.out.printf("[Alarm]Sent error: data=%s, message={} \n", advertEvent.getSource(), advertEvent.getValue(AdvertEvent.ERROR_KEY)); }}
复制代码

1.4 编写 module

import com.google.inject.Binder;import com.google.inject.Key;import com.google.inject.Module;import com.google.inject.Scopes;import com.ice.event.client.AlarmEventClient;import com.ice.event.client.ClickEventClient;import com.ice.event.client.LogEventClient;import com.ice.event.client.RedisEventClient;import io.airlift.event.client.EventClient;import io.airlift.event.client.EventModule;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
public class AdvertModule implements Module { @Override public void configure(Binder binder) { // 1. 前提条件:安装 EventModule binder.install(new EventModule());
// 2. 绑定 EventClient 实现类,同时还需要绑定到 MultiEventClient 中 Class<? extends EventClient>[] clsArray = client( LogEventClient.class, RedisEventClient.class, ClickEventClient.class, AlarmEventClient.class);
for(Class<? extends EventClient> cls: clsArray){ binder.bind(cls).in(Scopes.SINGLETON); newSetBinder(binder, EventClient.class).addBinding().to(Key.get(cls)).in(Scopes.SINGLETON); }
// 3. 绑定 AdvertDataService 服务 binder.bind(AdvertDataService.class).to(AdvertDataServiceImpl.class).in(Scopes.SINGLETON); }
private Class<? extends EventClient>[] client(Class<? extends EventClient>... cls) { return cls; }}
复制代码

1.5 实际集成

import com.google.inject.Guice;import com.google.inject.Injector;
import java.util.Arrays;import java.util.Random;import java.util.concurrent.TimeUnit;
public class AdvertStart { public static void main(String[] args) throws Exception { // 1. 构建 Injector Injector injector = Guice.createInjector(new AdvertModule());
// 2. 获取 AdvertDataService 服务实例 AdvertDataService advertDataService = injector.getInstance(AdvertDataService.class);
// 3. 模拟事件处理 while (true) { for (AdvertData.Type type : AdvertData.Type.values()) { System.out.println("Testing: " + type); print('v', 32); advertDataService.process(new AdvertData(type)); TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); print('^', 32); System.out.println("Tested: " + type); System.out.println(); }
TimeUnit.SECONDS.sleep(10); System.out.println(); System.out.println("************************************"); } }
private static void print(char c, int size) { char[] chars = new char[size]; Arrays.fill(chars, c); System.out.println(new String(chars)); }}
复制代码


执行程序后,可看见相关事件被正确处理:

2. 其他模块

  • event-http:通过 HttpEventClient 上报事件数据,需要在 config.properties 配置 service-inventory.uri 上报地址,HttpEventClient 会将事件序列化为 JSON 数据,之后上报数据;

  • http-client:底层基于 Jetty 的 HttpClient,支持 HTTP 1 和 HTTP 2 协议,支持创建同步异步客户端;

  • http-server:底层基于 Jetty 的 ServerConnector,ServerConnector 使用的还是 NIO,支持 HTTP 1 和 HTTP 2 协议。

3. Presto 中的应用

  • event 和 event-http:在 Presto Verifier 中推送查询验证事件,Presto Verifier 是用来验证 SQL 语句查询正确性的工具,例如验证同一个 SQL 多次查询返回结果是否相等;Preto 升级后的校验;

  • http-client:广泛的应用在单元测试中,用来模拟客户端的实际请求,验证 Server 端处理的正确性;

  • http-server:Presto 提供的所有的 HTTP 服务都是基于 http-server 构建的。

发布于: 20 小时前阅读数: 18
用户头像

分享技术上的点滴收获! 2013-08-06 加入

一杯咖啡,一首老歌,一段代码,欢迎做客冰屋,享受编码和技术带来的快乐!

评论

发布
暂无评论
Presto 设计与实现(七):Event_数据湖_冰心的小屋_InfoQ写作社区