写点什么

kafka 数据同步到 mysql

  • 2025-09-01
    北京
  • 本文字数:1934 字

    阅读完需:约 6 分钟

本文分享自天翼云开发者社区《kafka数据同步到mysql》,作者:刘****猛

kafka 安装

使用 docker-compose 进行安装,docker-compose 文件如下:

version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"    networks:      proxy:        ipv4_address: 172.16.0.8  kafka:    image: wurstmeister/kafka:latest    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142      KAFKA_CREATE_TOPICS: "test:1:1"      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181    networks:      proxy:        ipv4_address: 172.16.0.9networks:  proxy:    ipam:      config:        - subnet: 172.16.0.0/24
复制代码

这样安装的 kakfa 是没有密码的,下面为 kafka 配置密码

先将 kafka 的配置文件映射到本机目录

docker cp 277:/opt/kafka/config /root/docker-build/kafka/config/docker cp 277:/opt/kafka/bin /root/docker-build/kafka/bin/
复制代码
添加密码

然后将容器删除 

docker-compose down
复制代码

修改 config 目录下的 server.properties

############################# Server Basics #############################broker.id=-1listeners=SASL_PLAINTEXT://192.168.183.137:9092advertised.listeners=SASL_PLAINTEXT://192.168.183.137:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAINauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
复制代码

修改 bin 目录下的 kafka-server-start.sh 文件,修改如下



重新启动 kafka,修改 docker-compose.yml 文件如下

version: '2'services:  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"    networks:      proxy:        ipv4_address: 172.16.0.8  kafka:    image: wurstmeister/kafka:latest    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142      KAFKA_CREATE_TOPICS: "test:1:1"      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181    volumes:        - ./config:/opt/kafka/config        - ./bin:/opt/kafka/bin    networks:      proxy:        ipv4_address: 172.16.0.9networks:  proxy:    ipam:      config:        - subnet: 172.16.0.0/24
复制代码

启动容器 

docker-compose up -d

这样把 kafka 启动起来

测试步骤

在 sp 中启动任务

sql 脚本

create table goods_source (  goods_id int,  goods_price decimal(8,2),  goods_name varchar,  goods_details varchar) WITH (  'connector' = 'kafka',  'properties.bootstrap.servers' = '101.43.164.4:9092',  'topic' = 'test_kafka',  'properties.group.id' = 'test-consumer-group-1',  'properties.security.protocol' = 'SASL_PLAINTEXT',  'properties.sasl.mechanism' = 'PLAIN',  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="******";',  'scan.startup.mode' =  'earliest-offset',  'format' =  'json'
);create table goods_target ( goods_id int, goods_price decimal(8,2), goods_name varchar, goods_details varchar, PRIMARY KEY (`goods_id`) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://101.43.164.4:3306/cdc-sink?useSSL=false&characterEncoding=utf-8', 'table-name' = 'my_goods_kafka', 'username' = 'root', 'password' = '******');insert into goods_targetselect *from goods_source;
复制代码

然后写代码,推送十条数据

这里是 java 写的推送,仅供参考

@Test    public void test1() throws ExecutionException, InterruptedException {        for (int i = 10; i <= 20; i++) {            CdcTestGoods cdcTestGoods = new CdcTestGoods();            cdcTestGoods.setGoods_id(5 + i);            cdcTestGoods.setGoods_name("iphone 14 pro max 128G  " + i);            cdcTestGoods.setGoods_details("京东618大降价,买到就是赚  " + i);            cdcTestGoods.setGoods_price(5899f);            SendResult<String, String> result = kafkaTemplate.send("test_kafka", JacksonUtils.getString(cdcTestGoods)).get();            log.info("sendMessageSync =>  {},message: {}", result, JacksonUtils.getString(cdcTestGoods));        }    }
复制代码

查看 mysql 表,出现相关内容,kafaka 只支持 insert  不支持 update



用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
kafka数据同步到mysql_数据库复制_天翼云开发者社区_InfoQ写作社区