写点什么

深入浅出 Apache Pulsar(5)Pulsar Connectors

  • 2022 年 1 月 25 日
  • 本文字数:1405 字

    阅读完需:约 5 分钟

深入浅出 Apache Pulsar(5)Pulsar Connectors

Pulsar Connectors



消息处理(Processing guarantee)

  • at-most-once

  • at-least-once

  • effectively-once

操作流程(JDBC sink)

  1. Add a configuration file.

  2. Create a schema.

  3. Upload a schema to a topic.

  4. Create a JDBC sink

  5. Stop a JDBC sink

  6. Restart a JDBC sink

  7. Update a JDBC sink

内建连接器(Built-in connector)

Source connector

  • Canal

  • File

  • Flume

  • Kafka

  • RabbitMQ

Sink connector

  • ElasticSearch/Solr

  • Flume

  • HBase

  • HDFS2/HDFS3

  • InfluxDB

  • JDBC ClickHouse/MariaDB/PostgreSQL

  • Kafka

  • MongoDB

  • RabbitMQ

  • Redis

ClickHouse Sink

  1. 创建表

CREATE DATABASE IF NOT EXISTS monitor;CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink(    id   UInt32,    name String) ENGINE = TinyLog;INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (id, name)VALUES (1, 'tmp');SELECT *FROM monitor.pulsar_clickhouse_jdbc_sink;
复制代码
  1. 创建配置

$ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml{    "userName": "sysop",    "password": "123456",    "jdbcUrl": "jdbc:clickhouse://server-101:8123/monitor",    "tableName": "pulsar_clickhouse_jdbc_sink"}
复制代码
  1. 创建 schema

$ vi $PULSAR_HOME/connectors/json-schema.json{  "name": "",  "schema": {    "type": "record",    "name": "SeedEvent",    "namespace": "com.cloudwise.quickstart.model",    "fields": [      {        "name": "id",        "type": [          "null",          "int"        ]      },      {        "name": "name",        "type": [          "null",          "string"        ]      }    ]  },  "type": "JSON",  "properties": {    "__alwaysAllowNull": "true",    "__jsr310ConversionEnabled": "false"  }}
复制代码
  1. 上传 schema

$ $PULSAR_HOME/bin/pulsar-admin schemas upload \pulsar-postgres-jdbc-sink-topic \-f $PULSAR_HOME/connectors/json-schema.json
复制代码
  1. 运行

$ $PULSAR_HOME/bin/pulsar-admin sinks create \--tenant public \--namespace default \--name pulsar-clickhouse-jdbc-sink \--inputs pulsar-clickhouse-jdbc-sink-topic \--sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \--archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \--processing-guarantees EFFECTIVELY_ONCE \--parallelism 1
复制代码

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台 OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给 OMP 点赞送 star,了解更多相关内容~


GitHub 地址:https://github.com/CloudWise-OpenSource/OMP


Gitee 地址:https://gitee.com/CloudWise/OMP


微信扫描识别下方二维码,备注【OMP】加入 AIOps 社区运维管理平台 OMP 开发者交流群,与更多行业大佬一起交流学习~


系列阅读

深入浅出Apache Pulsar(1):Pulsar vs Kafka

深入浅出Apache Pulsar(2):Pulsar消息机制

深入浅出 Apache Pulsar(3):Pulsar Schema

深入浅出 Apache Pulsar(4)Pulsar Functions

用户头像

全栈智能业务运维服务商 2021.03.10 加入

我们秉承Make Digital Online的使命,致力于通过先进的产品技术,为企业数字化转型和提升IT运营效率持续赋能。 https://www.cloudwise.com/

评论

发布
暂无评论
深入浅出 Apache Pulsar(5)Pulsar Connectors