写点什么

【大厂技术内幕】字节跳动原来是这么做数据迁移的!

  • 2021 年 11 月 11 日
  • 本文字数:10335 字

    阅读完需:约 34 分钟

5.2.1 mapper

ApArticleConfigMapper 中新增方法



ApArticleConfigMapper.xml


5.2.2 service

对文章配置操作的 service



ApArticleConfigServiceImpl 是对 ApArticleConfig 的操作



5.3 文章内容接口



5.3.1 mapper 定义

ApArticleContentMapper 新增方法


List<ApArticleContent> selectByArticleIds(List<String> articleIds);

5.3.2 service

对文章内容操作的 Service


public interface ApArticleContenService {


List<ApArticleContent> queryByArticleIds(List<String> ids);


ApArticleContent getByArticleIds(Integer id);


}


ApArticleContenServiceImpl


对 ApArticleConten 相关的操作


代码位置:com.heima.migration.service.impl.ApArticleContenServiceImpl


@Service


public class ApArticleContenServiceImpl implements ApArticleContenService {


@Autowired


private ApArticleContentMapper apArticleContentMapper;


@Override


public List<ApArticleContent> queryByArticleIds(List<String> ids) {


return apArticleContentMapper.selectByArticleIds(ids);


}


@Override


public ApArticleContent getByArticleIds(Integer id) {


return apArticleContentMapper.selectByArticleId(id);


}


}


7.4 文章接口



7.4.1 mapper 定义

ApArticleMapper 新增方法


/**


  • 查询

  • @param apArticle

  • @return


*/


List<ApArticle> selectList(ApArticle apArticle);


/**


  • 更新

  • @param apArticle


*/


void updateSyncStatus(ApArticle apArticle);


ApArticleMapper.xml


<sql id="Base_Column_Where">


<where>


<if test="title!=null and title!=''">


and title = #{title}


</if>


<if test="authorId!=null and authorId!=''">


and author_id = #{authorId}


</if>


<if test="authorName!=null and authorName!=''">


and author_name = #{authorName}


</if>


<if test="channelId!=null and channelId!=''">


and channel_id = #{channelId}


</if>


<if test="channelName!=null and channelName!=''">


and channel_name = #{channelName}


</if>


<if test="layout!=null and layout!=''">


and layout = #{layout}


</if>


<if test="flag!=null and flag!=''">


and flag = #{flag}


</if>


<if test="views!=null and views!=''">


and views = #{views}


</if>


<if test="syncStatus!=null">


and sync_status = #{syncStatus}


</if>


</where>


</sql>


<select id="selectList" resultMap="resultMap">


select


<include refid="Base_Column_List"/>


from ap_article


<include refid="Base_Column_Where"/>


</select>


<update id="updateSyncStatus">


UPDATE ap_article SET sync_status = #{syncStatus} WHERE id=#{id}


</update>

7.4.2 service

对 ApArticle 操作的 Service


接口位置:com.heima.migration.service.ApArticleService


public interface ApArticleService {


public ApArticle getById(Long id);


/**


  • 获取未同步的数据

  • @return


*/


public List<ApArticle> getUnsyncApArticleList();


/**


  • 更新同步状态

  • @param apArticle


*/


void updateSyncStatus(ApArticle apArticle);


}


ApArticleServiceImpl


对 ApArticleService 相关的操作


代码位置:com.heima.migration.service.impl.ApArticleServiceImpl


@Log4j2


@Service


public class ApArticleServiceImpl implements ApArticleService {


@Autowired


private ApArticleMapper apArticleMapper;


public ApArticle getById(Long id) {


return apArticleMapper.selectById(id);


}


/**


  • 获取未同步的数据

  • @return


*/


public List<ApArticle> getUnsyncApArticleList() {


ApArticle apArticleQuery = new ApArticle();


apArticleQuery.setSyncStatus(false);


return apArticleMapper.selectList(apArticleQuery);


}


/**


  • 更新数据同步状态

  • @param apArticle


*/


public void updateSyncStatus(ApArticle apArticle) {


log.info("开始更新数据同步状态,apArticle:{}", apArticle);


if (null != apArticle) {


apArticle.setSyncStatus(true);


apArticleMapper.updateSyncStatus(apArticle);


}


}


}


7.5 文章作者接口



7.5.1 mapper 定义

ApAuthorMapper


List<ApAuthor> selectByIds(List<Integer> ids);


ApAuthorMapper.xml


<select id="selectByIds" resultMap="BaseResultMap">


select * from ap_author


where id in


<foreach item="item" index="index" collection="list" open="(" separator="," close=")">


#{item}


</foreach>


</select>

7.5.2 service

对 ApAuthor 操作的 Service


接口位置:com.heima.migration.service.ApAuthorService


public interface ApAuthorService {


List<ApAuthor> queryByIds(List<Integer> ids);


ApAuthor getById(Long id);


}


ApAuthorServiceImpl


对 ApAuthor 相关的操作


代码位置:com.heima.migration.service.impl.ApAuthorServiceImpl


@Service


public class ApAuthorServiceImpl implements ApAuthorService {


@Autowired


private ApAuthorMapper apAuthorMapper;


@Override


public List<ApAuthor> queryByIds(List<Integer> ids) {


return apAuthorMapper.selectByIds(ids);


}


@Override


public ApAuthor getById(Long id) {


if (null != id) {


return apAuthorMapper.selectById(id.intValue());


}


return null;


}


}


7.6 综合迁移接口




ArticleQuantityService


操作 ArticleQuantity 对象的 Service ArticleQuantity 对象封装了文章相关的数据


接口位置:com.heima.migration.service.ArticleQuantityService


public interface ArticleQuantityService {


/**


  • 获取 ArticleQuantity 列表

  • @return


*/


public List<ArticleQuantity> getArticleQuantityList();


/**


  • 根据 ArticleId 获取 ArticleQuantity

  • @param id

  • @return


*/


public ArticleQuantity getArticleQuantityByArticleId(Long id);


/**


  • 根据 ByArticleId 从 Hbase 中获取 ArticleQuantity

  • @param id

  • @return


*/


public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id);


/**


  • 数据库到 Hbase 的同步


*/


public void dbToHbase();


/**


  • 根据 articleId 将数据库的数据同步到 Hbase

  • @param articleId


*/


public void dbToHbase(Integer articleId);


}


ArticleQuantityServiceImpl


对 ArticleQuantity 的相关操作


代码位置:com.heima.migration.service.impl.ArticleQuantityServiceImpl


/**


  • 查询未同步的数据,并封装成 ArticleQuantity 对象


*/


@Service


@Log4j2


public class ArticleQuantityServiceImpl implements ArticleQuantityService {


@Autowired


private ApArticleContenService apArticleContenService;


@Autowired


private ApArticleConfigService apArticleConfigService;


@Autowired


private ApAuthorService apAuthorService;


@Autowired


private HBaseStorageClient hBaseStorageClient;


@Autowired


private ApArticleService apArticleService;


/**


  • 查询位同步数据的列表

  • @return


*/


public List<ArticleQuantity> getArticleQuantityList() {


log.info("生成 ArticleQuantity 列表");


//查询未同步的庶数据


List<ApArticle> apArticleList = apArticleService.getUnsyncApArticleList();


if (apArticleList.isEmpty()) {


return null;


}


//获取 ArticleId 的 list


List<String> apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList());


//获取 AuthorId 的 list


List<Integer> apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList());


//根据 apArticleIdList 批量查询出内容列表


List<ApArticleContent> apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList);


//根据 apArticleIdList 批量查询出配置列表


List<ApArticleConfig> apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList);


//根据 apAuthorIdList 批量查询出作者列


List<ApAuthor> apAuthorList = apAuthorService.queryByIds(apAuthorIdList);


//将不同的对象转换为 ArticleQuantity 对象


List<ArticleQuantity> articleQuantityList = apArticleList.stream().map(apArticle -> {


return new ArticleQuantity() {{


//设置 apArticle 对象


setApArticle(apArticle);


// 根据 apArticle.getId() 过滤出符合要求的 ApArticleContent 对象


List<ApArticleContent> apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());


if (null != apArticleContents && !apArticleContents.isEmpty()) {


setApArticleContent(apArticleContents.get(0));


}


// 根据 apArticle.getId 过滤出 ApArticleConfig 对象


List<ApArticleConfig> apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());


if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) {


setApArticleConfig(apArticleConfigs.get(0));


}


// 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象


List<ApAuthor> apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList());


if (null != apAuthors && !apAuthors.isEmpty()) {


setApAuthor(apAuthors.get(0));


}


//设置回调方法 用户方法的回调 用于修改同步状态 插入 Hbase 成功后同步状态改为已同步


setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x)));


}};


}).collect(Collectors.toList());


if (null != articleQuantityList && !articleQuantityList.isEmpty()) {


log.info("生成 ArticleQuantity 列表完成,size:{}", articleQuantityList.size());


} else {


log.info("生成 ArticleQuantity 列表完成,size:{}", 0);


}


return articleQuantityList;


}


public ArticleQuantity getArticleQuantityByArticleId(Long id) {


if (null == id) {


return null;


}


ArticleQuantity articleQuantity = null;


ApArticle apArticle = apArticleService.getById(id);


if (null != apArticle) {


articleQuantity = new ArticleQuantity();


articleQuantity.setApArticle(apArticle);


ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue());


articleQuantity.setApArticleContent(apArticleContent);


ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue());


articleQuantity.setApArticleConfig(apArticleConfig);


ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId());


articleQuantity.setApAuthor(apAuthor);


}


return articleQuantity;


}


public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) {


if (null == id) {


return null;


}


ArticleQuantity articleQuantity = null;


List<Class> typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class);


List<Object> objectList = hBaseStorageClient.getStorageDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, DataConvertUtils.toString(id), typeList);


if (null != objectList && !objectList.isEmpty()) {


articleQuantity = new ArticleQuantity();


for (Object value : objectList) {


if (value instanceof ApArticle) {


articleQuantity.setApArticle((ApArticle) value);


} else if (value instanceof ApArticleContent) {


articleQuantity.setApArticleContent((ApArticleContent) value);


} else if (value instanceof ApArticleConfig) {


articleQuantity.setApArticleConfig((ApArticleConfig) value);


} else if (value instanceof ApAuthor) {


articleQuantity.setApAuthor((ApAuthor) value);


}


}


}


return articleQuantity;


}


/**


  • 数据库到 Hbase 同步


*/


public void dbToHbase() {


long cutrrentTime = System.currentTimeMillis();


List<ArticleQuantity> articleQuantitList = getArticleQuantityList();


if (null != articleQuantitList && !articleQuantitList.isEmpty()) {


log.info("开始进行定时数据库到 HBASE 同步,筛选出未同步数据量:{}", articleQuantitList.size());


if (null != articleQuantitList && !articleQuantitList.isEmpty()) {


List<HBaseStorage> hbaseStorageList = articleQuantitList.stream().map(ArticleQuantity::getHbaseStorage).collect(Collectors.toList());


hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hbaseStorageList);


}


} else {


log.info("定时数据库到 HBASE 同步为筛选出数据");


}


log.info("定时数据库到 HBASE 同步结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);


}


@Override


public void dbToHbase(Integer articleId) {


long cutrrentTime = System.currentTimeMillis();


log.info("开始进行异步数据库到 HBASE 同步,articleId:{}", articleId);


if (null != articleId) {


ArticleQuantity articleQuantity = getArticleQuantityByArticleId(articleId.longValue());


if (null != articleQuantity) {


HBaseStorage hBaseStorage = articleQuantity.getHbaseStorage();


hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hBaseStorage);


}


}


log.info("异步数据库到 HBASE 同步结束,articleId:{},耗时:{}", articleId, System.currentTimeMillis() - cutrrentTime);


}


}


7.7 热点文章接口




ApHotArticleService


对 ApHotArticle 操作 Service


接口位置:com.heima.migration.service.ApHotArticleService


public interface ApHotArticleService {


List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery);


void insert(ApHotArticles apHotArticles);


/**


  • 热数据 Hbase 同步

  • @param apArticleId


*/


public void hotApArticleSync(Integer apArticleId);


void deleteById(Integer id);


/**


  • 查询过期的数据

  • @return


*/


public List<ApHotArticles> selectExpireMonth();


void deleteHotData(ApHotArticles apHotArticle);


}


ApHotArticleServiceImpl


对 ApHotArticle 的相关操作


代码位置:com.heima.migration.service.impl.ApHotArticleServiceImpl


/**


  • 热点数据操作 Service 类


*/


@Service


@Log4j2


public class ApHotArticleServiceImpl implements ApHotArticleService {


@Autowired


private ApHotArticlesMapper apHotArticlesMapper;


@Autowired


private MongoTemplate mongoTemplate;


@Autowired


private ArticleQuantityService articleQuantityService;


@Autowired


private HBaseStorageClient hBaseStorageClient;


@Override


public List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) {


return apHotArticlesMapper.selectList(apHotArticlesQuery);


}


/**


  • 根据 ID 删除

  • @param id


*/


@Override


public void deleteById(Integer id) {


log.info("删除热数据,apArticleId:{}", id);


apHotArticlesMapper.deleteById(id);


}


/**


  • 查询一个月之前的数据

  • @return


*/


@Override


public List<ApHotArticles> selectExpireMonth() {


return apHotArticlesMapper.selectExpireMonth();


}


/**


  • 删除过去的热数据

  • @param apHotArticle


*/


@Override


public void deleteHotData(ApHotArticles apHotArticle) {


deleteById(apHotArticle.getId());


String rowKey = DataConvertUtils.toString(apHotArticle.getId());


hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);


MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);


if (null != mongoStorageEntity) {


mongoTemplate.remove(mongoStorageEntity);


}


}


/**


  • 插入操作

  • @param apHotArticles


*/


@Override


public void insert(ApHotArticles apHotArticles) {


apHotArticlesMapper.insert(apHotArticles);


}


/**


  • 热点数据同步方法

  • @param apArticleId


*/


@Override


public void hotApArticleSync(Integer apArticleId) {


log.info("开始将热数据同步,apArticleId:{}", apArticleId);


ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);


if (null != articleQuantity) {


//热点数据同步到 DB 中


hotApArticleToDBSync(articleQuantity);


//热点数据同步到 MONGO


hotApArticleMongoSync(articleQuantity);


log.info("热数据同步完成,apArticleId:{}", apArticleId);


} else {


log.error("找不到对应的热数据,apArticleId:{}", apArticleId);


}


}


/**


  • 获取热数据的 ArticleQuantity 对象

  • @param apArticleId

  • @return


*/


private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {


Long id = Long.valueOf(apArticleId);


ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);


if (null == articleQuantity) {


articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);


}


return articleQuantity;


}


/**


  • 热数据 到数据库 Mysql 的同步

  • @param articleQuantity


*/


public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {


Integer apArticleId = articleQuantity.getApArticleId();


log.info("开始将热数据从 Hbase 同步到 mysql,apArticleId:{}", apArticleId);


if (null == apArticleId) {


log.error("apArticleId 不存在无法进行同步");


return;


}


ApHotArticles apHotArticlesQuery = new ApHotArticles() {{


setArticleId(apArticleId);


}};


List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);


if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {


log.info("Mysql 数据已同步过不需要再次同步,apArticleId:{}", apArticleId);


} else {


ApHotArticles apHotArticles = articleQuantity.getApHotArticles();


apHotArticlesMapper.insert(apHotArticles);


}


log.info("将热数据从 Hbase 同步到 mysql 完成,apArticleId:{}", apArticleId);


}


/**


  • 热数据向从 Hbase 到 Mongodb 同步

  • @param articleQuantity


*/


public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {


Integer apArticleId = articleQuantity.getApArticleId();


log.info("开始将热数据从 Hbase 同步到 MongoDB,apArticleId:{}", apArticleId);


if (null == apArticleId) {


log.error("apArticleId 不存在无法进行同步");


return;


}


String rowKeyId = DataConvertUtils.toString(apArticleId);


MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);


if (null != mongoStorageEntity) {


log.info("MongoDB 数据已同步过不需要再次同步,apArticleId:{}", apArticleId);


} else {


List<StorageData> storageDataList = articleQuantity.getStorageDataList();


if (null != storageDataList && !storageDataList.isEmpty()) {


mongoStorageEntity = new MongoStorageEntity();


mongoStorageEntity.setDataList(storageDataList);


mongoStorageEntity.setRowKey(rowKeyId);


mongoTemplate.insert(mongoStorageEntity);


}


}


log.info("将热数据从 Hbase 同步到 MongoDB 完成,apArticleId:{}", apArticleId);


}


}


8 定时同步数据


=======================================================================


8.1 全量数据从 mysql 同步到 HBase




@Component


@DisallowConcurrentExecution


@Log4j2


/**


  • 全量数据从 mysql 同步到 HBase


*/


public class MigrationDbToHBaseQuartz extends AbstractJob {


@Autowired


private ArticleQuantityService articleQuantityService;


@Override


public String[] triggerCron() {


/**


  • 2019/8/9 10:15:00

  • 2019/8/9 10:20:00

  • 2019/8/9 10:25:00

  • 2019/8/9 10:30:00

  • 2019/8/9 10:35:00


*/


return new String[]{"0 0/5 * * * ?"};


}


@Override


protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {


log.info("开始进行数据库到 HBASE 同步任务");


articleQuantityService.dbToHbase();


log.info("数据库到 HBASE 同步任务完成");


}


}


8.2 定期删除过期的数据




/**


  • 定期删除过期的数据


*/


@Component


@Log4j2


public class MigrationDeleteHotDataQuartz extends AbstractJob {


@Autowired


private ApHotArticleService apHotArticleService;


@Override


public String[] triggerCron() {


/**


  • 2019/8/9 22:30:00

  • 2019/8/10 22:30:00

  • 2019/8/11 22:30:00

  • 2019/8/12 22:30:00

  • 2019/8/13 22:30:00


*/


return new String[]{"0 30 22 * * ?"};


}


@Override


protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {


long cutrrentTime = System.currentTimeMillis();


log.info("开始删除数据库过期数据");


deleteExpireHotData();


log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);


}


/**


  • 删除过期的热数据


*/


public void deleteExpireHotData() {


List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth();


if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {


for (ApHotArticles apHotArticle : apHotArticlesList) {


apHotArticleService.deleteHotData(apHotArticle);


}


}


}


}


9 消息接收同步数据


=========================================================================


9.1 文章审核成功同步



9.1.1 消息发送

(1)消息名称定义及消息发送方法声明


maven_test.properties


kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test


kafka.properties


kafka.topic.article-audit-success=${kafka.topic.article-audit-success}


com.heima.common.kafka.KafkaTopicConfig 新增属性


/**


  • 审核成功


*/


String articleAudi


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


tSuccess;


com.heima.common.kafka.KafkaSender


/**


  • 发送审核成功消息


*/


public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {


ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();


temp.setData(message);


this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);


}


(2)修改自动审核代码,爬虫和自媒体都要修改


在审核成功后,发送消息


爬虫


//文章审核成功


ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();


articleAuditSuccess.setArticleId(apArticle.getId());


articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER);


articleAuditSuccess.setChannelId(apArticle.getChannelId());


kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);


自媒体


//文章审核成功


ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();


articleAuditSuccess.setArticleId(apArticle.getId());


articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);


articleAuditSuccess.setChannelId(apArticle.getChannelId());


kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

9.1.2 消息接收

/**


  • 热点文章监听类


*/


@Component


@Log4j2


public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> {


/**


  • 通用转换 mapper


*/


@Autowired


ObjectMapper mapper;


/**


  • kafka 主题 配置


*/


@Autowired


KafkaTopicConfig kafkaTopicConfig;


@Autowired


private ArticleQuantityService articleQuantityService;


@Override


public String topic() {


return kafkaTopicConfig.getArticleAuditSuccess();


}


/**


  • 监听消息

  • @param data

  • @param consumer


*/


@Override


public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {


log.info("kafka 接收到审核通过消息:{}", data);


String value = (String) data.value();


if (null != value) {


ArticleAuditSuccessMessage message = null;


try {


message = mapper.readValue(value, ArticleAuditSuccessMessage.class);


} catch (IOException e) {


e.printStackTrace();


}


ArticleAuditSuccess auto = message.getData();


if (null != auto) {


//调用方法 将 HBAESE 中的热数据进行同步


Integer articleId = auto.getArticleId();


if (null != articleId) {


articleQuantityService.dbToHbase(articleId);


}


}


}


}


}


9.2 热点文章同步




创建监听类:com.heima.migration.kafka.listener.MigrationHotArticleListener


/**


  • 热点文章监听类


*/


@Component


@Log4j2


public class MigrationHotArticleListener implements KafkaListener<String, String> {


/**


  • 通用转换 mapper


*/


@Autowired


ObjectMapper mapper;


/**


  • kafka 主题 配置


*/


@Autowired


KafkaTopicConfig kafkaTopicConfig;


/**


  • 热点文章 service 注入


*/


@Autowired


private ApHotArticleService apHotArticleService;


@Override


public String topic() {


return kafkaTopicConfig.getHotArticle();


}


/**


  • 监听消息

  • @param data

  • @param consumer


*/


@Override


public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {


log.info("kafka 接收到热数据同步消息:{}", data);


String value = (String) data.value();

评论

发布
暂无评论
【大厂技术内幕】字节跳动原来是这么做数据迁移的!