写点什么

大数据 -11-MapReduce JOIN 操作的 Java 实现 Driver Mapper Reducer 具体实现逻辑 模拟 SQL 进行联表操作

作者:武子康
  • 2025-06-13
    山东
  • 本文字数:6524 字

    阅读完需:约 21 分钟

大数据-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

目前 2025 年 06 月 13 日更新到:AI 炼丹日志-28 - Audiblez 将你的电子书 epub 转换为音频 mp3 做有声书,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 06 月 11 日更新到:Java-44 深入浅出 Nginx - 底层进程机制 Master Worker 机制原理 常用指令 MyBatis 已完结,Spring 已完结,深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!目前 2025 年 06 月 13 日更新到:大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上一节我们完成了:


  • MapReduce 的介绍

  • Hadoop 序列化介绍

  • Mapper 编写规范

  • Reducer 编写规范

  • Driver 编写规范

  • WordCount 功能开发

  • WordCount 本地测试

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123


MapReduce Reduce-Side Join

这是最通用的 JOIN 方式。Mapper 负责将相同 key 的数据发往同一个 Reducer,由 Reducer 进行 JOIN。

对应特点

  • 可以处理 任意大小的两个数据集

  • 实现相对简单

  • 网络传输量大,效率相对低

实现思路

  • 每个输入记录由 Mapper 输出为 (join_key, record) 形式,并附带一个标记来区分来源(如 “A”, “B”)

  • Shuffle 阶段会将相同的 key 聚合到同一个 Reducer

  • Reducer 对 key 下的数据分为 A/B 两类,执行 JOIN 逻辑(如嵌套循环或哈希匹配)


# Mapperdef map(key, value):    table, row = parse(value)    join_key = row['id']    emit(join_key, (table, row))
# Reducerdef reduce(join_key, values): a_rows = [v for t, v in values if t == 'A'] b_rows = [v for t, v in values if t == 'B'] for a in a_rows: for b in b_rows: emit(join_key, join(a, b))
复制代码

Map-Side Join(映射端 JOIN)

适用于一个数据集很小(可放入内存中),另一个很大时的情况。

对应特点

  • 无需 Shuffle,效率高

  • 前提是小表 可以完全加载到每个 Mapper 的内存中

实现思路

  • 在作业启动前,将小表通过分布式缓存(Distributed Cache)分发到每个 Mapper 节点

  • Mapper 启动时加载小表到内存,然后大表逐行处理,与内存中的小表匹配 JOIN


# Mapper setupdef setup():    global small_table    small_table = load_from_cache()
# Mapperdef map(key, large_row): join_key = large_row['id'] if join_key in small_table: emit(join_key, join(large_row, small_table[join_key]))
复制代码

Semi-Join(半连接)

用于减少数据传输量,常用于大型数据预过滤。

对应特点

  • 先将小表的 JOIN key 发送给大表所在 Mapper 节点

  • 大表根据 key 预过滤,避免无关数据进入后续 JOIN 阶段

  • 是一种优化策略,常配合 Reduce-Side Join 使用

Bloom Join(基于布隆过滤器)

适用于极端大数据量 + 高 selectivity 的场景。

对应特点

  • 用布隆过滤器压缩小表 key 并广播

  • 大表 Mapper 用 Bloom Filter 进行预筛选

  • 是一种概率性过滤,可能有少量假阳性,但大大减少数据量


业务需求

平常我们在业务上,有很多时候表都是分开的,通过一些 id 或者 code 来进行关联。在大数据的情况下,也有很多这种情况,我们需要进行联表操作。

表 1

项目编码projectCode 项目名projectName
复制代码

表 2

项目编码projectCode 项目类型projectType 项目分类projectFrom
复制代码


SQL 中,可以通过 LEFT JOIN 来实现字段补齐。大数据下,也需要进行这样的操作,我们需要借助 MapReduce

表 1 测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"  "社区项目1""02d9c090-e467-42b6-9c14-52cacd72a4a8"  "社区项目2""244dcaca-0778-4eec-b3a2-403f8fac1dfb"  "智慧社区""94befb97-d1af-43f2-b5d5-6df9ce5b9393"  "公交站点""f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"  "街道布建""2e556d83-bb56-45b1-8d6e-00510902c464"  "街道公交站点""3ba00542-eac9-4399-9c2b-3b06e671f4c9"  "未命名项目1""5a5982d7-7257-422f-822a-a0c2f31c28d1"  "未命名项目2"
复制代码

表 2 测试

"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"  "重要类型"  "种类1""02d9c090-e467-42b6-9c14-52cacd72a4a8"  "重要类型"  "种类1""244dcaca-0778-4eec-b3a2-403f8fac1dfb"  "重要类型"  "种类1""94befb97-d1af-43f2-b5d5-6df9ce5b9393"  "普通类型"  "种类1""f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"  "普通类型"  "种类2""2e556d83-bb56-45b1-8d6e-00510902c464"  "普通类型"  "种类2""3ba00542-eac9-4399-9c2b-3b06e671f4c9"  "一般类型"  "种类2""5a5982d7-7257-422f-822a-a0c2f31c28d1"  "一般类型"  "种类2"
复制代码

SQL 连表

假设我们使用 SQL 的方式联表:


SELECT  *FROM  t_projectLEFT JOIN  t_project_infoON  t_project.projectCode=t_project_info.projectCode
复制代码

Reduce JOIN

有时候,表可能过大,无法支持我们使用 SQL 进行连表查询。这里我们编写一个程序来完成操作。

ProjectBean

这里是最终的 Bean 类,里边是两个表把字段补齐的结果,一会儿我们将使用这个类进行表的连接。


package icu.wzk.demo03;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;
public class ProjectBean implements Writable {
private String projectCode;
private String projectName;
private String projectType;
private String projectFrom;
private String flag;
@Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(projectCode); dataOutput.writeUTF(projectName); dataOutput.writeUTF(projectType); dataOutput.writeUTF(projectFrom); dataOutput.writeUTF(flag); }
@Override public void readFields(DataInput dataInput) throws IOException { this.projectCode = dataInput.readUTF(); this.projectName = dataInput.readUTF(); this.projectType = dataInput.readUTF(); this.projectFrom = dataInput.readUTF(); this.flag = dataInput.readUTF(); }
public ProjectBean(String projectCode, String projectName, String projectType, String projectFrom, String flag) { this.projectCode = projectCode; this.projectName = projectName; this.projectType = projectType; this.projectFrom = projectFrom; this.flag = flag; }
public ProjectBean() {
}
@Override public String toString() { return "ProjectBean{" + "projectCode='" + projectCode + '\'' + ", projectName='" + projectName + '\'' + ", projectType='" + projectType + '\'' + ", projectFrom='" + projectFrom + '\'' + ", flag=" + flag + '\'' + '}'; }
public String getProjectCode() { return projectCode; }
public void setProjectCode(String projectCode) { this.projectCode = projectCode; }
public String getProjectName() { return projectName; }
public void setProjectName(String projectName) { this.projectName = projectName; }
public String getProjectType() { return projectType; }
public void setProjectType(String projectType) { this.projectType = projectType; }
public String getProjectFrom() { return projectFrom; }
public void setProjectFrom(String projectFrom) { this.projectFrom = projectFrom; }
public String getFlag() { return flag; }
public void setFlag(String flag) { this.flag = flag; }}
复制代码

Reduce Driver

package icu.wzk.demo03;
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReducerJoinDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// String inputPath = args[0]; // String outputPath = args[1];
// === 测试环境 === String inputPath = "project_test"; String outputPath = "project_test_output"; // === ===
Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "ReducerJoinDriver"); job.setJarByClass(ReducerJoinDriver.class);
job.setMapperClass(ReducerJoinMapper.class); job.setReducerClass(ReducerJoinReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ProjectBean.class);
job.setOutputKeyClass(ProjectBean.class); job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath));
boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }
}
复制代码

ReduceMapper

package icu.wzk.demo03;
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReducerJoinMapper extends Mapper<LongWritable, Text, Text, ProjectBean> {
String name; ProjectBean projectBean = new ProjectBean(); Text k = new Text();
@Override protected void setup(Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException { // 获取路径信息 name = context.getInputSplit().toString(); }
@Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, ProjectBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); if (name.contains("layout_project")) { // layout_project String[] fields = line.split("\t"); projectBean.setProjectCode(fields[0]); projectBean.setProjectName(fields[1]); projectBean.setProjectType(""); projectBean.setProjectFrom(""); projectBean.setFlag("layout_project"); // projectCode 关联 k.set(fields[0]); } else { // project_info String[] fields = line.split("\t"); projectBean.setProjectCode(fields[0]); projectBean.setProjectName(""); projectBean.setProjectType(fields[1]); projectBean.setProjectFrom(fields[2]); projectBean.setFlag("project_info"); // projectCode 关联 k.set(fields[0]); } context.write(k, projectBean); }}
复制代码

ReduceReducer

package icu.wzk.demo03;
import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;import java.util.ArrayList;import java.util.List;
public class ReducerJoinReducer extends Reducer<Text, ProjectBean, ProjectBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<ProjectBean> values, Reducer<Text, ProjectBean, ProjectBean, NullWritable>.Context context) throws IOException, InterruptedException { List<ProjectBean> dataList = new ArrayList<>(); ProjectBean deviceProjectBean = new ProjectBean(); for (ProjectBean pb : values) { if ("layout_project".equals(pb.getFlag())) { // layout_project ProjectBean projectProjectBean = new ProjectBean( pb.getProjectCode(), pb.getProjectName(), pb.getProjectType(), pb.getProjectFrom(), pb.getFlag() ); dataList.add(projectProjectBean); } else { // project_info deviceProjectBean = new ProjectBean( pb.getProjectCode(), pb.getProjectName(), pb.getProjectType(), pb.getProjectFrom(), pb.getFlag() ); } }
for (ProjectBean pb : dataList) { pb.setProjectType(deviceProjectBean.getProjectType()); pb.setProjectFrom(deviceProjectBean.getProjectFrom()); context.write(pb, NullWritable.get()); } }}
复制代码

运行结果

ProjectBean{projectCode='"02d9c090-e467-42b6-9c14-52cacd72a4a8"', projectName='"社区项目2"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}ProjectBean{projectCode='"244dcaca-0778-4eec-b3a2-403f8fac1dfb"', projectName='"智慧社区"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}ProjectBean{projectCode='"2e556d83-bb56-45b1-8d6e-00510902c464"', projectName='"街道公交站点"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}ProjectBean{projectCode='"3ba00542-eac9-4399-9c2b-3b06e671f4c9"', projectName='"未命名项目1"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}ProjectBean{projectCode='"5a5982d7-7257-422f-822a-a0c2f31c28d1"', projectName='"未命名项目2"', projectType='"一般类型"', projectFrom='"种类2"', flag=layout_project'}ProjectBean{projectCode='"8aea9ba2-435c-48bd-9751-1cbd4c344d4e"', projectName='"社区项目1"', projectType='"重要类型"', projectFrom='"种类1"', flag=layout_project'}ProjectBean{projectCode='"94befb97-d1af-43f2-b5d5-6df9ce5b9393"', projectName='"公交站点"', projectType='"普通类型"', projectFrom='"种类1"', flag=layout_project'}ProjectBean{projectCode='"f44c8d10-bc92-4398-ad9b-8c11dd48ad7c"', projectName='"街道布建"', projectType='"普通类型"', projectFrom='"种类2"', flag=layout_project'}
复制代码


方案缺点

JOIN 操作是在 reduce 阶段完成的,reduce 端处理压力过大map节点的运算负载很低,资源利用不高

发布于: 刚刚阅读数: 6
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-11-MapReduce JOIN 操作的Java实现 Driver Mapper Reducer具体实现逻辑 模拟SQL进行联表操作_Java_武子康_InfoQ写作社区