写点什么

深入浅出 Apache Pulsar(4)Pulsar Functions

  • 2022 年 1 月 24 日
  • 本文字数:2882 字

    阅读完需:约 9 分钟

深入浅出Apache Pulsar(4)Pulsar Functions

Pulsar Functions

编程模型(Programming model)

开启 Functions

  1. conf/bookkeeper.conf


extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
复制代码


  1. conf/broker.conf


functionsWorkerEnabled=true
复制代码


  1. conf/functions_worker.yml


pulsarFunctionsCluster: pulsar-clusternumFunctionPackageReplicas: 2
复制代码

窗口(window)

  • windowLengthCount 每个窗口的消息数量

  • slidingIntervalCount 窗口滑动后的消息数量

  • windowLengthDurationMs 窗口时间

  • slidingIntervalDurationMs 窗口滑动后的时间

开窗函数

public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> {    @Override    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {        for (Record<String> input : inputs) {        }        return null;    }}
复制代码

运行函数

  • 时间,滑动窗口


--user-config '{"windowLengthDurationMs":"60000", "slidingIntervalDurationMs":"1000"}'


  • 时间,滚动窗口


--user-config '{"windowLengthDurationMs":"60000"}'


  • 数量,滑动窗口


--user-config '{"windowLengthCount":"100", "slidingIntervalCount":"10"}'


  • 数量,滚动窗口


--user-config '{"windowLengthCount":"100"}'

Java 编程

pom.xml


<dependency>    <groupId>org.apache.pulsar</groupId>    <artifactId>pulsar-client</artifactId>    <version>${pulsar.version}</version></dependency><dependency>    <groupId>org.apache.pulsar</groupId>    <artifactId>pulsar-functions-api</artifactId>    <version>${pulsar.version}</version></dependency><dependency>    <groupId>org.apache.pulsar</groupId>    <artifactId>pulsar-functions-local-runner</artifactId>    <version>${pulsar.version}</version></dependency>
复制代码


  1. WordCount


public class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> {    @Override    public Void process(String input, Context context) throws Exception {        Arrays.asList(input.split(" ")).forEach(word -> {            String counterKey = word.toLowerCase();            if (context.getCounter(counterKey) == 0) {                context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));            }            context.incrCounter(counterKey, 1);        });        return null;    }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--broker-service-url pulsar://server-101:6650 \--jar target/cloudwise-pulsar-functions-with-dependencies.jar \--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \--tenant public \--namespace default \--name word-count-function \--inputs persistent://public/default/sentences \--output persistent://public/default/wordcount
复制代码


  1. 动态路由


/** * 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。 */public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> {    @Override    public String process(String input, Context context) throws Exception {        String regex = context.getUserConfigValue("regex").toString();        String matchedTopic = context.getUserConfigValue("matched-topic").toString();        String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString();        Pattern pattern = Pattern.compile(regex);        Matcher matcher = pattern.matcher(input);        if (matcher.matches()) {            context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();        } else {            context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();        }        return null;    }}
复制代码


  1. log-topic


public class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> {    @Override    public Void process(String s, Context context) throws Exception {        Logger LOG = context.getLogger();        String messageId = context.getFunctionId();        if (s.contains("danger")) {            LOG.warn("A warning was received in message {}", messageId);        } else {            LOG.info("Message {} received\nContent: {}", messageId, s);        }        return null;    }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--jar cloudwise-pulsar-functions-1.0.0.jar \--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \--log-topic persistent://public/default/logging-function-logs
复制代码


  1. user-config


public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> {    @Override    public Void process(String s, Context context) throws Exception {        Logger log = context.getLogger();        Optional<Object> value = context.getUserConfigValue("word-of-the-day");        if (value.isPresent()) {            log.info("The word of the day is {}", value);        } else {            log.warn("No word of the day provided");        }        return null;    }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--broker-service-url pulsar://server-101:6650 \--jar target/cloudwise-pulsar-functions-with-dependencies.jar \--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \--tenant public \--namespace default \--name word-count-function \--inputs persistent://public/default/userconfig \--user-config '{"word-of-the-day":"verdure"}'
复制代码

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台 OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给 OMP 点赞送 star,了解更多相关内容~


GitHub 地址:https://github.com/CloudWise-OpenSource/OMP


Gitee 地址:https://gitee.com/CloudWise/OMP


微信扫描识别下方二维码,备注【OMP】加入 AIOps 社区运维管理平台 OMP 开发者交流群,与更多行业大佬一起交流学习~


系列阅读

深入浅出Apache Pulsar(1):Pulsar vs Kafka

深入浅出Apache Pulsar(2):Pulsar消息机制

深入浅出 Apache Pulsar(3):Pulsar Schema

用户头像

全栈智能业务运维服务商 2021.03.10 加入

我们秉承Make Digital Online的使命,致力于通过先进的产品技术,为企业数字化转型和提升IT运营效率持续赋能。 https://www.cloudwise.com/

评论

发布
暂无评论
深入浅出Apache Pulsar(4)Pulsar Functions