Pulsar Functions
编程模型(Programming model)
开启 Functions
conf/bookkeeper.conf
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
复制代码
conf/broker.conf
functionsWorkerEnabled=true
复制代码
conf/functions_worker.yml
pulsarFunctionsCluster: pulsar-cluster
numFunctionPackageReplicas: 2
复制代码
窗口(window)
windowLengthCount 每个窗口的消息数量
slidingIntervalCount 窗口滑动后的消息数量
windowLengthDurationMs 窗口时间
slidingIntervalDurationMs 窗口滑动后的时间
开窗函数
public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> input : inputs) {
}
return null;
}
}
复制代码
运行函数
--user-config '{"windowLengthDurationMs":"60000", "slidingIntervalDurationMs":"1000"}'
--user-config '{"windowLengthDurationMs":"60000"}'
--user-config '{"windowLengthCount":"100", "slidingIntervalCount":"10"}'
--user-config '{"windowLengthCount":"100"}'
Java 编程
pom.xml
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner</artifactId>
<version>${pulsar.version}</version>
</dependency>
复制代码
WordCount
public class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
if (context.getCounter(counterKey) == 0) {
context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));
}
context.incrCounter(counterKey, 1);
});
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/sentences \
--output persistent://public/default/wordcount
复制代码
动态路由
/**
* 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。
*/
public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
String regex = context.getUserConfigValue("regex").toString();
String matchedTopic = context.getUserConfigValue("matched-topic").toString();
String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString();
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(input);
if (matcher.matches()) {
context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();
} else {
context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();
}
return null;
}
}
复制代码
log-topic
public class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String s, Context context) throws Exception {
Logger LOG = context.getLogger();
String messageId = context.getFunctionId();
if (s.contains("danger")) {
LOG.warn("A warning was received in message {}", messageId);
} else {
LOG.info("Message {} received\nContent: {}", messageId, s);
}
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--jar cloudwise-pulsar-functions-1.0.0.jar \
--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs
复制代码
user-config
public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String s, Context context) throws Exception {
Logger log = context.getLogger();
Optional<Object> value = context.getUserConfigValue("word-of-the-day");
if (value.isPresent()) {
log.info("The word of the day is {}", value);
} else {
log.warn("No word of the day provided");
}
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/userconfig \
--user-config '{"word-of-the-day":"verdure"}'
复制代码
更多福利
云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台 OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给 OMP 点赞送 star,了解更多相关内容~
GitHub 地址:https://github.com/CloudWise-OpenSource/OMP
Gitee 地址:https://gitee.com/CloudWise/OMP
微信扫描识别下方二维码,备注【OMP】加入 AIOps 社区运维管理平台 OMP 开发者交流群,与更多行业大佬一起交流学习~
系列阅读
深入浅出Apache Pulsar(1):Pulsar vs Kafka
深入浅出Apache Pulsar(2):Pulsar消息机制
深入浅出 Apache Pulsar(3):Pulsar Schema
评论