【大厂技术内幕】字节跳动原来是这么做数据迁移的!
5.2.1 mapper
ApArticleConfigMapper 中新增方法
ApArticleConfigMapper.xml
5.2.2 service
对文章配置操作的 service
ApArticleConfigServiceImpl 是对 ApArticleConfig 的操作
5.3.1 mapper 定义
ApArticleContentMapper 新增方法
List<ApArticleContent> selectByArticleIds(List<String> articleIds);
5.3.2 service
对文章内容操作的 Service
public interface ApArticleContenService {
List<ApArticleContent> queryByArticleIds(List<String> ids);
ApArticleContent getByArticleIds(Integer id);
}
ApArticleContenServiceImpl
对 ApArticleConten 相关的操作
代码位置:com.heima.migration.service.impl.ApArticleContenServiceImpl
@Service
public class ApArticleContenServiceImpl implements ApArticleContenService {
@Autowired
private ApArticleContentMapper apArticleContentMapper;
@Override
public List<ApArticleContent> queryByArticleIds(List<String> ids) {
return apArticleContentMapper.selectByArticleIds(ids);
}
@Override
public ApArticleContent getByArticleIds(Integer id) {
return apArticleContentMapper.selectByArticleId(id);
}
}
7.4.1 mapper 定义
ApArticleMapper 新增方法
/**
查询
@param apArticle
@return
*/
List<ApArticle> selectList(ApArticle apArticle);
/**
更新
@param apArticle
*/
void updateSyncStatus(ApArticle apArticle);
ApArticleMapper.xml
<sql id="Base_Column_Where">
<where>
<if test="title!=null and title!=''">
and title = #{title}
</if>
<if test="authorId!=null and authorId!=''">
and author_id = #{authorId}
</if>
<if test="authorName!=null and authorName!=''">
and author_name = #{authorName}
</if>
<if test="channelId!=null and channelId!=''">
and channel_id = #{channelId}
</if>
<if test="channelName!=null and channelName!=''">
and channel_name = #{channelName}
</if>
<if test="layout!=null and layout!=''">
and layout = #{layout}
</if>
<if test="flag!=null and flag!=''">
and flag = #{flag}
</if>
<if test="views!=null and views!=''">
and views = #{views}
</if>
<if test="syncStatus!=null">
and sync_status = #{syncStatus}
</if>
</where>
</sql>
<select id="selectList" resultMap="resultMap">
select
<include refid="Base_Column_List"/>
from ap_article
<include refid="Base_Column_Where"/>
</select>
<update id="updateSyncStatus">
UPDATE ap_article SET sync_status = #{syncStatus} WHERE id=#{id}
</update>
7.4.2 service
对 ApArticle 操作的 Service
接口位置:com.heima.migration.service.ApArticleService
public interface ApArticleService {
public ApArticle getById(Long id);
/**
获取未同步的数据
@return
*/
public List<ApArticle> getUnsyncApArticleList();
/**
更新同步状态
@param apArticle
*/
void updateSyncStatus(ApArticle apArticle);
}
ApArticleServiceImpl
对 ApArticleService 相关的操作
代码位置:com.heima.migration.service.impl.ApArticleServiceImpl
@Log4j2
@Service
public class ApArticleServiceImpl implements ApArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
public ApArticle getById(Long id) {
return apArticleMapper.selectById(id);
}
/**
获取未同步的数据
@return
*/
public List<ApArticle> getUnsyncApArticleList() {
ApArticle apArticleQuery = new ApArticle();
apArticleQuery.setSyncStatus(false);
return apArticleMapper.selectList(apArticleQuery);
}
/**
更新数据同步状态
@param apArticle
*/
public void updateSyncStatus(ApArticle apArticle) {
log.info("开始更新数据同步状态,apArticle:{}", apArticle);
if (null != apArticle) {
apArticle.setSyncStatus(true);
apArticleMapper.updateSyncStatus(apArticle);
}
}
}
7.5.1 mapper 定义
ApAuthorMapper
List<ApAuthor> selectByIds(List<Integer> ids);
ApAuthorMapper.xml
<select id="selectByIds" resultMap="BaseResultMap">
select * from ap_author
where id in
<foreach item="item" index="index" collection="list" open="(" separator="," close=")">
#{item}
</foreach>
</select>
7.5.2 service
对 ApAuthor 操作的 Service
接口位置:com.heima.migration.service.ApAuthorService
public interface ApAuthorService {
List<ApAuthor> queryByIds(List<Integer> ids);
ApAuthor getById(Long id);
}
ApAuthorServiceImpl
对 ApAuthor 相关的操作
代码位置:com.heima.migration.service.impl.ApAuthorServiceImpl
@Service
public class ApAuthorServiceImpl implements ApAuthorService {
@Autowired
private ApAuthorMapper apAuthorMapper;
@Override
public List<ApAuthor> queryByIds(List<Integer> ids) {
return apAuthorMapper.selectByIds(ids);
}
@Override
public ApAuthor getById(Long id) {
if (null != id) {
return apAuthorMapper.selectById(id.intValue());
}
return null;
}
}
ArticleQuantityService
操作 ArticleQuantity 对象的 Service ArticleQuantity 对象封装了文章相关的数据
接口位置:com.heima.migration.service.ArticleQuantityService
public interface ArticleQuantityService {
/**
获取 ArticleQuantity 列表
@return
*/
public List<ArticleQuantity> getArticleQuantityList();
/**
根据 ArticleId 获取 ArticleQuantity
@param id
@return
*/
public ArticleQuantity getArticleQuantityByArticleId(Long id);
/**
根据 ByArticleId 从 Hbase 中获取 ArticleQuantity
@param id
@return
*/
public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id);
/**
数据库到 Hbase 的同步
*/
public void dbToHbase();
/**
根据 articleId 将数据库的数据同步到 Hbase
@param articleId
*/
public void dbToHbase(Integer articleId);
}
ArticleQuantityServiceImpl
对 ArticleQuantity 的相关操作
代码位置:com.heima.migration.service.impl.ArticleQuantityServiceImpl
/**
查询未同步的数据,并封装成 ArticleQuantity 对象
*/
@Service
@Log4j2
public class ArticleQuantityServiceImpl implements ArticleQuantityService {
@Autowired
private ApArticleContenService apArticleContenService;
@Autowired
private ApArticleConfigService apArticleConfigService;
@Autowired
private ApAuthorService apAuthorService;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Autowired
private ApArticleService apArticleService;
/**
查询位同步数据的列表
@return
*/
public List<ArticleQuantity> getArticleQuantityList() {
log.info("生成 ArticleQuantity 列表");
//查询未同步的庶数据
List<ApArticle> apArticleList = apArticleService.getUnsyncApArticleList();
if (apArticleList.isEmpty()) {
return null;
}
//获取 ArticleId 的 list
List<String> apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList());
//获取 AuthorId 的 list
List<Integer> apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList());
//根据 apArticleIdList 批量查询出内容列表
List<ApArticleContent> apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList);
//根据 apArticleIdList 批量查询出配置列表
List<ApArticleConfig> apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList);
//根据 apAuthorIdList 批量查询出作者列
List<ApAuthor> apAuthorList = apAuthorService.queryByIds(apAuthorIdList);
//将不同的对象转换为 ArticleQuantity 对象
List<ArticleQuantity> articleQuantityList = apArticleList.stream().map(apArticle -> {
return new ArticleQuantity() {{
//设置 apArticle 对象
setApArticle(apArticle);
// 根据 apArticle.getId() 过滤出符合要求的 ApArticleContent 对象
List<ApArticleContent> apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
if (null != apArticleContents && !apArticleContents.isEmpty()) {
setApArticleContent(apArticleContents.get(0));
}
// 根据 apArticle.getId 过滤出 ApArticleConfig 对象
List<ApArticleConfig> apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) {
setApArticleConfig(apArticleConfigs.get(0));
}
// 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象
List<ApAuthor> apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList());
if (null != apAuthors && !apAuthors.isEmpty()) {
setApAuthor(apAuthors.get(0));
}
//设置回调方法 用户方法的回调 用于修改同步状态 插入 Hbase 成功后同步状态改为已同步
setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x)));
}};
}).collect(Collectors.toList());
if (null != articleQuantityList && !articleQuantityList.isEmpty()) {
log.info("生成 ArticleQuantity 列表完成,size:{}", articleQuantityList.size());
} else {
log.info("生成 ArticleQuantity 列表完成,size:{}", 0);
}
return articleQuantityList;
}
public ArticleQuantity getArticleQuantityByArticleId(Long id) {
if (null == id) {
return null;
}
ArticleQuantity articleQuantity = null;
ApArticle apArticle = apArticleService.getById(id);
if (null != apArticle) {
articleQuantity = new ArticleQuantity();
articleQuantity.setApArticle(apArticle);
ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue());
articleQuantity.setApArticleContent(apArticleContent);
ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue());
articleQuantity.setApArticleConfig(apArticleConfig);
ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId());
articleQuantity.setApAuthor(apAuthor);
}
return articleQuantity;
}
public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) {
if (null == id) {
return null;
}
ArticleQuantity articleQuantity = null;
List<Class> typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class);
List<Object> objectList = hBaseStorageClient.getStorageDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, DataConvertUtils.toString(id), typeList);
if (null != objectList && !objectList.isEmpty()) {
articleQuantity = new ArticleQuantity();
for (Object value : objectList) {
if (value instanceof ApArticle) {
articleQuantity.setApArticle((ApArticle) value);
} else if (value instanceof ApArticleContent) {
articleQuantity.setApArticleContent((ApArticleContent) value);
} else if (value instanceof ApArticleConfig) {
articleQuantity.setApArticleConfig((ApArticleConfig) value);
} else if (value instanceof ApAuthor) {
articleQuantity.setApAuthor((ApAuthor) value);
}
}
}
return articleQuantity;
}
/**
数据库到 Hbase 同步
*/
public void dbToHbase() {
long cutrrentTime = System.currentTimeMillis();
List<ArticleQuantity> articleQuantitList = getArticleQuantityList();
if (null != articleQuantitList && !articleQuantitList.isEmpty()) {
log.info("开始进行定时数据库到 HBASE 同步,筛选出未同步数据量:{}", articleQuantitList.size());
if (null != articleQuantitList && !articleQuantitList.isEmpty()) {
List<HBaseStorage> hbaseStorageList = articleQuantitList.stream().map(ArticleQuantity::getHbaseStorage).collect(Collectors.toList());
hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hbaseStorageList);
}
} else {
log.info("定时数据库到 HBASE 同步为筛选出数据");
}
log.info("定时数据库到 HBASE 同步结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
}
@Override
public void dbToHbase(Integer articleId) {
long cutrrentTime = System.currentTimeMillis();
log.info("开始进行异步数据库到 HBASE 同步,articleId:{}", articleId);
if (null != articleId) {
ArticleQuantity articleQuantity = getArticleQuantityByArticleId(articleId.longValue());
if (null != articleQuantity) {
HBaseStorage hBaseStorage = articleQuantity.getHbaseStorage();
hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hBaseStorage);
}
}
log.info("异步数据库到 HBASE 同步结束,articleId:{},耗时:{}", articleId, System.currentTimeMillis() - cutrrentTime);
}
}
ApHotArticleService
对 ApHotArticle 操作 Service
接口位置:com.heima.migration.service.ApHotArticleService
public interface ApHotArticleService {
List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery);
void insert(ApHotArticles apHotArticles);
/**
热数据 Hbase 同步
@param apArticleId
*/
public void hotApArticleSync(Integer apArticleId);
void deleteById(Integer id);
/**
查询过期的数据
@return
*/
public List<ApHotArticles> selectExpireMonth();
void deleteHotData(ApHotArticles apHotArticle);
}
ApHotArticleServiceImpl
对 ApHotArticle 的相关操作
代码位置:com.heima.migration.service.impl.ApHotArticleServiceImpl
/**
热点数据操作 Service 类
*/
@Service
@Log4j2
public class ApHotArticleServiceImpl implements ApHotArticleService {
@Autowired
private ApHotArticlesMapper apHotArticlesMapper;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private ArticleQuantityService articleQuantityService;
@Autowired
private HBaseStorageClient hBaseStorageClient;
@Override
public List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) {
return apHotArticlesMapper.selectList(apHotArticlesQuery);
}
/**
根据 ID 删除
@param id
*/
@Override
public void deleteById(Integer id) {
log.info("删除热数据,apArticleId:{}", id);
apHotArticlesMapper.deleteById(id);
}
/**
查询一个月之前的数据
@return
*/
@Override
public List<ApHotArticles> selectExpireMonth() {
return apHotArticlesMapper.selectExpireMonth();
}
/**
删除过去的热数据
@param apHotArticle
*/
@Override
public void deleteHotData(ApHotArticles apHotArticle) {
deleteById(apHotArticle.getId());
String rowKey = DataConvertUtils.toString(apHotArticle.getId());
hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);
MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);
if (null != mongoStorageEntity) {
mongoTemplate.remove(mongoStorageEntity);
}
}
/**
插入操作
@param apHotArticles
*/
@Override
public void insert(ApHotArticles apHotArticles) {
apHotArticlesMapper.insert(apHotArticles);
}
/**
热点数据同步方法
@param apArticleId
*/
@Override
public void hotApArticleSync(Integer apArticleId) {
log.info("开始将热数据同步,apArticleId:{}", apArticleId);
ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);
if (null != articleQuantity) {
//热点数据同步到 DB 中
hotApArticleToDBSync(articleQuantity);
//热点数据同步到 MONGO
hotApArticleMongoSync(articleQuantity);
log.info("热数据同步完成,apArticleId:{}", apArticleId);
} else {
log.error("找不到对应的热数据,apArticleId:{}", apArticleId);
}
}
/**
获取热数据的 ArticleQuantity 对象
@param apArticleId
@return
*/
private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {
Long id = Long.valueOf(apArticleId);
ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);
if (null == articleQuantity) {
articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);
}
return articleQuantity;
}
/**
热数据 到数据库 Mysql 的同步
@param articleQuantity
*/
public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {
Integer apArticleId = articleQuantity.getApArticleId();
log.info("开始将热数据从 Hbase 同步到 mysql,apArticleId:{}", apArticleId);
if (null == apArticleId) {
log.error("apArticleId 不存在无法进行同步");
return;
}
ApHotArticles apHotArticlesQuery = new ApHotArticles() {{
setArticleId(apArticleId);
}};
List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);
if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
log.info("Mysql 数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
} else {
ApHotArticles apHotArticles = articleQuantity.getApHotArticles();
apHotArticlesMapper.insert(apHotArticles);
}
log.info("将热数据从 Hbase 同步到 mysql 完成,apArticleId:{}", apArticleId);
}
/**
热数据向从 Hbase 到 Mongodb 同步
@param articleQuantity
*/
public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {
Integer apArticleId = articleQuantity.getApArticleId();
log.info("开始将热数据从 Hbase 同步到 MongoDB,apArticleId:{}", apArticleId);
if (null == apArticleId) {
log.error("apArticleId 不存在无法进行同步");
return;
}
String rowKeyId = DataConvertUtils.toString(apArticleId);
MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);
if (null != mongoStorageEntity) {
log.info("MongoDB 数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
} else {
List<StorageData> storageDataList = articleQuantity.getStorageDataList();
if (null != storageDataList && !storageDataList.isEmpty()) {
mongoStorageEntity = new MongoStorageEntity();
mongoStorageEntity.setDataList(storageDataList);
mongoStorageEntity.setRowKey(rowKeyId);
mongoTemplate.insert(mongoStorageEntity);
}
}
log.info("将热数据从 Hbase 同步到 MongoDB 完成,apArticleId:{}", apArticleId);
}
}
=======================================================================
@Component
@DisallowConcurrentExecution
@Log4j2
/**
全量数据从 mysql 同步到 HBase
*/
public class MigrationDbToHBaseQuartz extends AbstractJob {
@Autowired
private ArticleQuantityService articleQuantityService;
@Override
public String[] triggerCron() {
/**
2019/8/9 10:15:00
2019/8/9 10:20:00
2019/8/9 10:25:00
2019/8/9 10:30:00
2019/8/9 10:35:00
*/
return new String[]{"0 0/5 * * * ?"};
}
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
log.info("开始进行数据库到 HBASE 同步任务");
articleQuantityService.dbToHbase();
log.info("数据库到 HBASE 同步任务完成");
}
}
/**
定期删除过期的数据
*/
@Component
@Log4j2
public class MigrationDeleteHotDataQuartz extends AbstractJob {
@Autowired
private ApHotArticleService apHotArticleService;
@Override
public String[] triggerCron() {
/**
2019/8/9 22:30:00
2019/8/10 22:30:00
2019/8/11 22:30:00
2019/8/12 22:30:00
2019/8/13 22:30:00
*/
return new String[]{"0 30 22 * * ?"};
}
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
long cutrrentTime = System.currentTimeMillis();
log.info("开始删除数据库过期数据");
deleteExpireHotData();
log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
}
/**
删除过期的热数据
*/
public void deleteExpireHotData() {
List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth();
if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
for (ApHotArticles apHotArticle : apHotArticlesList) {
apHotArticleService.deleteHotData(apHotArticle);
}
}
}
}
=========================================================================
9.1.1 消息发送
(1)消息名称定义及消息发送方法声明
maven_test.properties
kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test
kafka.properties
kafka.topic.article-audit-success=${kafka.topic.article-audit-success}
com.heima.common.kafka.KafkaTopicConfig 新增属性
/**
审核成功
*/
String articleAudi
tSuccess;
com.heima.common.kafka.KafkaSender
/**
发送审核成功消息
*/
public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {
ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();
temp.setData(message);
this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);
}
(2)修改自动审核代码,爬虫和自媒体都要修改
在审核成功后,发送消息
爬虫
//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER);
articleAuditSuccess.setChannelId(apArticle.getChannelId());
kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);
自媒体
//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);
articleAuditSuccess.setChannelId(apArticle.getChannelId());
kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);
9.1.2 消息接收
/**
热点文章监听类
*/
@Component
@Log4j2
public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> {
/**
通用转换 mapper
*/
@Autowired
ObjectMapper mapper;
/**
kafka 主题 配置
*/
@Autowired
KafkaTopicConfig kafkaTopicConfig;
@Autowired
private ArticleQuantityService articleQuantityService;
@Override
public String topic() {
return kafkaTopicConfig.getArticleAuditSuccess();
}
/**
监听消息
@param data
@param consumer
*/
@Override
public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
log.info("kafka 接收到审核通过消息:{}", data);
String value = (String) data.value();
if (null != value) {
ArticleAuditSuccessMessage message = null;
try {
message = mapper.readValue(value, ArticleAuditSuccessMessage.class);
} catch (IOException e) {
e.printStackTrace();
}
ArticleAuditSuccess auto = message.getData();
if (null != auto) {
//调用方法 将 HBAESE 中的热数据进行同步
Integer articleId = auto.getArticleId();
if (null != articleId) {
articleQuantityService.dbToHbase(articleId);
}
}
}
}
}
创建监听类:com.heima.migration.kafka.listener.MigrationHotArticleListener
/**
热点文章监听类
*/
@Component
@Log4j2
public class MigrationHotArticleListener implements KafkaListener<String, String> {
/**
通用转换 mapper
*/
@Autowired
ObjectMapper mapper;
/**
kafka 主题 配置
*/
@Autowired
KafkaTopicConfig kafkaTopicConfig;
/**
热点文章 service 注入
*/
@Autowired
private ApHotArticleService apHotArticleService;
@Override
public String topic() {
return kafkaTopicConfig.getHotArticle();
}
/**
监听消息
@param data
@param consumer
*/
@Override
public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
log.info("kafka 接收到热数据同步消息:{}", data);
String value = (String) data.value();
评论