写点什么

大数据知识专栏 - MapReduce 的 Map 端 Join

用户头像
小马哥
关注
发布于: 2021 年 01 月 22 日
大数据知识专栏 - MapReduce 的 Map端Join

接上一篇 Reduce 端 Join, 当进行表文件 join 的时候, 如果其中有一个是小表(文件小), 可以将小表加载到分布式缓存中, 通过 Map 端 join 来减少 shuffle 过程网络数据传输, 达到 shuffle 调优的目的


Map 端 Join

概述

Reduce 端 Join: 需要数据在 map 端经过 shuffle 发送到 reduce 端, 增加网络传输, 如果可以不发送到 reduce 端, 而在 map 端即进行 join, 可以提高效率.


前提:有小表, 小表放分布式缓存

实现原理

​ 适用于关联表中有小表的情形. 使用分布式缓存, 可以将小表分发到所有的 map 节点,这样,map 节点就可以在本地对自己所读到的大表数据进行 join 并输出最终结果,可以大大提高 join 操作的并发度,加快处理速度.

应用举例


需求: 使用 MapReduce 输出上面 SQL 的结果, 给订单加上商品名称, 商品分类, 商品价格 (id,date,pname,category_id,price)


样例数据:

product.txt

P0001	华为Mate40Pro	c00001	7999P0002	苹果Iphone12	c00001	7999P0003	小米10	c00001	4999
复制代码


order.txt

100001	20201231	P0001	1100002	20201231	P0002	1100003	20201231	P0003	2
复制代码


1, Mapper 类

import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;
public class MapperJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private HashMap<String, String> productMap = new HashMap<String, String>();
// 读取分布式缓存中的小表数据 @Override protected void setup(Context context) throws IOException, InterruptedException { //1, 获取分布式缓存文件列表 URI[] cacheFiles = context.getCacheFiles(); //2, 获取小表所在的文件系统 URI cacheFile = cacheFiles[0]; FileSystem fileSystem = FileSystem.get(cacheFile, context.getConfiguration()); //3, 获取指向小表文件的输入流 FSDataInputStream inputStream = fileSystem.open(new Path(cacheFile)); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); //4, 读取文件的每一行内容, 保存数据到全局变量Map集合<id,行内容> String line = null; while ((line = reader.readLine()) != null) { String[] product = line.split("\t"); productMap.put(product[0], line); } //5, 关闭资源 reader.close(); fileSystem.close(); }
// 读取大表数据, 并与缓存中读取到的小表数据做join操作 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1,在代表大表Order的行文本中获取到商品id, 作为key String[] fields = value.toString().split("\t"); String productID = fields[2]; //2, 在小表Map集合中获取到商品id对应的值, 作为value String productInfor = productMap.get(productID); //3, 将key,value写入到上下文中 context.write(NullWritable.get(), new Text(productInfor + "\t" + value.toString())); }}
复制代码


2, Job 启动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.net.URI;
public class MainJob extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { //获取Job对象 Job job = Job.getInstance(super.getConf(), "MappJoin");
// *关键: * 将小表放入到分布式缓存中 job.addCacheFile(new URI("hdfs://hdfs小表文件地址"));
//1,设置输入类和文件输入路径 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("输入文件路径"));
//2, 设置Mapper类和数据类型 job.setMapperClass(MapperJoinMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class);
//3, 设置输出类和文件输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("输出文件路径"));
//启动任务 boolean completion = job.waitForCompletion(true); return completion?1:0; }
public static void main(String[] args) { Configuration configuration = new Configuration(); int runResult = 0; try { runResult = ToolRunner.run(configuration, new MainJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(runResult); }}
复制代码


发布于: 2021 年 01 月 22 日阅读数: 18
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏 - MapReduce 的 Map端Join