写点什么

深入浅出 Apache Pulsar(3):Pulsar Schema

  • 2022 年 1 月 20 日
  • 本文字数:1514 字

    阅读完需:约 5 分钟

深入浅出Apache Pulsar(3):Pulsar Schema

Pulsar Schema

Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.


  • 类型安全(序列化和反序列化)

  • Schema 帮助 Pulsar 保留了数据在其他系统中原有的含义

Schema 类型(Schema type)

  • Primitive type


Producer<String> producer = client.newProducer(Schema.STRING).create();producer.newMessage().value("Hello Pulsar!").send();Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();consumer.receive();
复制代码


  • Complex type


1. keyvalue key/value pair.


Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(Schema.INT32,Schema.STRING,KeyValueEncodingType.SEPARATED);// ProducerProducer<KeyValue<Integer, String>> producer = client.newProducer(schema)    .topic(TOPIC)    .create();final int key = 100;final String value = "value-100";producer.newMessage().value(new KeyValue<>(key, value)).send();// ConsumerConsumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)    .topic(TOPIC).subscriptionName(SubscriptionName).subscribe();Message<KeyValue<Integer, String>> msg = consumer.receive();
复制代码


2.struct AVRO, JSON, and Protobuf.


Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();User user = consumer.receive();
复制代码

Schema 工作方式(How does schema work)

Producer

Consumer

Schema 管理(Schema manual management)

查询 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \get persistent://public/default/spirit-avro-topic$ $PULSAR_HOME/bin/pulsar-admin schemas \get persistent://public/default/spirit-avro-topic \--version=2
复制代码

更新 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas upload \persistent://public/default/test-topic \--filename $PULSAR_HOME/connectors/json-schema.json
复制代码

提取 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \extract persistent://public/default/test-topic  \--classname com.cloudwise.modal.Packet \--jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \--type jsonpublic void schemaInfo() {    System.out.println("AvroSchema:" + AvroSchema.of(SeedEvent.class).getSchemaInfo());    System.out.println("Schema.AVRO:" + Schema.AVRO(SeedEvent.class).getSchemaInfo());}
复制代码

删除 Schema

$ $PULSAR_HOME/bin/pulsar-admin schemas \delete persistent://public/default/spirit-avro-topic
复制代码

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台 OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给 OMP 点赞送 star,了解更多相关内容~


GitHub 地址:https://github.com/CloudWise-OpenSource/OMP


Gitee 地址:https://gitee.com/CloudWise/OMP


微信扫描识别下方二维码,备注【OMP】加入 AIOps 社区运维管理平台 OMP 开发者交流群,与更多行业大佬一起交流学习~


系列阅读

深入浅出Apache Pulsar(1):Pulsar vs Kafka

深入浅出Apache Pulsar(2):Pulsar消息机制

用户头像

全栈智能业务运维服务商 2021.03.10 加入

我们秉承Make Digital Online的使命,致力于通过先进的产品技术,为企业数字化转型和提升IT运营效率持续赋能。 https://www.cloudwise.com/

评论

发布
暂无评论
深入浅出Apache Pulsar(3):Pulsar Schema