写点什么

大数据知识专栏 - MapReduce 的 Reduce 端 Join

用户头像
小马哥
关注
发布于: 2021 年 01 月 21 日
大数据知识专栏 - MapReduce 的 Reduce端Join

Reduce 端 Join

概述

多个表文件根据关联条件(map 输出的 key), 在 reduce 端进行 join 的场景

使用场景


有两个表: 商品表, 订单表, 示例数据如下:

如果数据量很大, 两个表的数据是以文件的形式存储在 HDFS 中, 需要实现如下 SQL:

select a.id,a.date,b.pname,b.categoryid,b.price from torder a left join t_product b on a.pid = b.id;

这个时候需要使用 MapReduce 来实现两个表的 Join


需求: 使用 MapReduce 输出上面 SQL 的结果, 给订单加上商品名称, 商品分类, 商品价格 (id,date,pname,category_id,price)

实现思路


通过将关联条件 tproduct.id 与 torder.pid 作为 map 输出的 key,这样 ,相同 pid 的 tproduct 与 torder 数据就会发往同一个 reduce task,在 ReduceTask 中, 相同 key 的数据, 就会放到一个 Iterable<Text> values 中; 然后, 将其中 value 数据取出组装成需要的格式, 输出到结果文件.


1, Mapper: (1)读入商品表和订单表文件; (2)输出格式: key=join 字段 pid, value=商品表和订单表的行 value;

2, Reducer: (1)根据 key 的不同来源文件(商品表 or 订单表),定义相应的字段 ; (2)输出符合需求的合适

具体实现, 查看如下代码:


代码实现

0, 样例数据

product.txt

P0001	华为Mate40Pro	c00001	7999P0002	苹果Iphone12	c00001	7999P0003	小米10	c00001	4999
复制代码

order.txt

100001	20201231	P0001	1100002	20201231	P0002	1100003	20201231	P0003	2
复制代码


1, Mapper 逻辑

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1, 判断value数据来自哪个文件, 确定拆分逻辑 FileSplit fileSplit = (FileSplit) context.getInputSplit(); String fileName = fileSplit.getPath().getName(); Text keyOut = new Text(); String[] fields = value.toString().split("\t"); if ("product.txt".equalsIgnoreCase(fileName)) { //2, 来源于Product.txt /* 样例数据 P0001 华为Mate40Pro c00001 7999 P0002 苹果Iphone12 c00001 7999 P0003 小米10 c00001 4999 */ keyOut.set(fields[0]); value.set("product\t" + value.toString()); } else { //3, 来源于Order.txt /* 样例数据 100001 20201231 P0001 1 100002 20201231 P0002 1 100003 20201231 P0003 2 */ keyOut.set(fields[2]); value.set("order\t" + value.toString()); } context.write(keyOut, value); }}
复制代码


2, Reducer 逻辑

import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceJoinReducer extends Reducer<Text, Text, NullWritable, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Text valueOut = new Text(); String order = ""; String product = ""; //1, 判断value是来自哪个表(Product.txt or Order.txt) for (Text value : values) { if (value.toString().startsWith("product")) { //2, 来源于Product.txt, 则将数据定义header /* 样例数据 product P0001 华为Mate40Pro c00001 7999 product P0002 苹果Iphone12 c00001 7999 product P0003 小米10 c00001 4999 */ product += value.toString().replace("product", ""); } else { //3, 来源于Order.txt, 则将value赋值给content /* 样例数据 order 100001 20201231 P0001 1 order 100002 20201231 P0002 1 order 100003 20201231 P0003 2 */ order += value.toString().replace("order", ""); } } //4, 输出header:content valueOut.set(order + "\t" + product); context.write(NullWritable.get(), valueOut); }}
复制代码


3, Job 启动类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override public int run(String[] args) throws Exception { //创建job Job job = Job.getInstance(super.getConf(), "ReduceJoin_job");
//1, 设置输入文件类, 输入目录 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("D:\\Source\\data\\data_in_reduceJoin"));
//2, 设置Mapper类, Mapper任务是输出格式 job.setMapperClass(ReduceJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class);
//3, 设置分区; 4,排序; 5,规约; 6,分组
//7, 设置Reducer类, Reducer任务的输出格式 job.setReducerClass(ReduceJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
//8, 设置输出文件类, 输出目录 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("D:\\Source\\data\\data_out_reduceJoin")); //启动Job boolean completion = job.waitForCompletion(true); return completion ? 1 : 0; }
public static void main(String[] args) { int run = 0; try { run = ToolRunner.run(new Configuration(), new JobMain(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(run); }}
复制代码


4, 输出结果

100001	20201231	P0001	1	P0001	华为Mate40Pro	c00001	7999100002	20201231	P0002	1	P0002	苹果Iphone12	c00001	7999100003	20201231	P0003	2	P0003	小米10	c00001	4999
复制代码


发布于: 2021 年 01 月 21 日阅读数: 19
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论

发布
暂无评论
大数据知识专栏 - MapReduce 的 Reduce端Join