kafka 数据同步到 mysql
作者:天翼云开发者社区
- 2025-09-01 北京
本文字数:1934 字
阅读完需:约 6 分钟
本文分享自天翼云开发者社区《kafka数据同步到mysql》,作者:刘****猛
kafka 安装
使用 docker-compose 进行安装,docker-compose 文件如下:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
proxy:
ipv4_address: 172.16.0.8
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
networks:
proxy:
ipv4_address: 172.16.0.9
networks:
proxy:
ipam:
config:
- subnet: 172.16.0.0/24
复制代码
这样安装的 kakfa 是没有密码的,下面为 kafka 配置密码
先将 kafka 的配置文件映射到本机目录
docker cp 277:/opt/kafka/config /root/docker-build/kafka/config/
docker cp 277:/opt/kafka/bin /root/docker-build/kafka/bin/
复制代码
添加密码
然后将容器删除
docker-compose down
复制代码
修改 config 目录下的 server.properties
############################# Server Basics #############################
broker.id=-1
listeners=SASL_PLAINTEXT://192.168.183.137:9092
advertised.listeners=SASL_PLAINTEXT://192.168.183.137:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
复制代码
修改 bin 目录下的 kafka-server-start.sh 文件,修改如下

重新启动 kafka,修改 docker-compose.yml 文件如下
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
proxy:
ipv4_address: 172.16.0.8
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.183.142
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- ./config:/opt/kafka/config
- ./bin:/opt/kafka/bin
networks:
proxy:
ipv4_address: 172.16.0.9
networks:
proxy:
ipam:
config:
- subnet: 172.16.0.0/24
复制代码
启动容器
docker-compose up -d
这样把 kafka 启动起来
测试步骤
在 sp 中启动任务
sql 脚本
create table goods_source (
goods_id int,
goods_price decimal(8,2),
goods_name varchar,
goods_details varchar
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '101.43.164.4:9092',
'topic' = 'test_kafka',
'properties.group.id' = 'test-consumer-group-1',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="******";',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
create table goods_target (
goods_id int,
goods_price decimal(8,2),
goods_name varchar,
goods_details varchar,
PRIMARY KEY (`goods_id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://101.43.164.4:3306/cdc-sink?useSSL=false&characterEncoding=utf-8',
'table-name' = 'my_goods_kafka',
'username' = 'root',
'password' = '******'
);
insert into
goods_target
select
*
from
goods_source;
复制代码
然后写代码,推送十条数据
这里是 java 写的推送,仅供参考
@Test
public void test1() throws ExecutionException, InterruptedException {
for (int i = 10; i <= 20; i++) {
CdcTestGoods cdcTestGoods = new CdcTestGoods();
cdcTestGoods.setGoods_id(5 + i);
cdcTestGoods.setGoods_name("iphone 14 pro max 128G " + i);
cdcTestGoods.setGoods_details("京东618大降价,买到就是赚 " + i);
cdcTestGoods.setGoods_price(5899f);
SendResult<String, String> result = kafkaTemplate.send("test_kafka", JacksonUtils.getString(cdcTestGoods)).get();
log.info("sendMessageSync => {},message: {}", result, JacksonUtils.getString(cdcTestGoods));
}
}
复制代码
查看 mysql 表,出现相关内容,kafaka 只支持 insert 不支持 update

划线
评论
复制
发布于: 刚刚阅读数: 2

天翼云开发者社区
关注
还未添加个人签名 2022-02-22 加入
天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
评论