Reduce 端 Join
概述
多个表文件根据关联条件(map 输出的 key), 在 reduce 端进行 join 的场景
使用场景
有两个表: 商品表, 订单表, 示例数据如下:
如果数据量很大, 两个表的数据是以文件的形式存储在 HDFS 中, 需要实现如下 SQL:
select a.id,a.date,b.pname,b.categoryid,b.price from torder a left join t_product b on a.pid = b.id;
这个时候需要使用 MapReduce 来实现两个表的 Join
需求: 使用 MapReduce 输出上面 SQL 的结果, 给订单加上商品名称, 商品分类, 商品价格 (id,date,pname,category_id,price)
实现思路
通过将关联条件 tproduct.id 与 torder.pid 作为 map 输出的 key,这样 ,相同 pid 的 tproduct 与 torder 数据就会发往同一个 reduce task,在 ReduceTask 中, 相同 key 的数据, 就会放到一个 Iterable<Text> values 中; 然后, 将其中 value 数据取出组装成需要的格式, 输出到结果文件.
1, Mapper: (1)读入商品表和订单表文件; (2)输出格式: key=join 字段 pid, value=商品表和订单表的行 value;
2, Reducer: (1)根据 key 的不同来源文件(商品表 or 订单表),定义相应的字段 ; (2)输出符合需求的合适
具体实现, 查看如下代码:
代码实现
0, 样例数据
product.txt
P0001 华为Mate40Pro c00001 7999
P0002 苹果Iphone12 c00001 7999
P0003 小米10 c00001 4999
复制代码
order.txt
100001 20201231 P0001 1
100002 20201231 P0002 1
100003 20201231 P0003 2
复制代码
1, Mapper 逻辑
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1, 判断value数据来自哪个文件, 确定拆分逻辑
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String fileName = fileSplit.getPath().getName();
Text keyOut = new Text();
String[] fields = value.toString().split("\t");
if ("product.txt".equalsIgnoreCase(fileName)) {
//2, 来源于Product.txt
/* 样例数据
P0001 华为Mate40Pro c00001 7999
P0002 苹果Iphone12 c00001 7999
P0003 小米10 c00001 4999
*/
keyOut.set(fields[0]);
value.set("product\t" + value.toString());
} else {
//3, 来源于Order.txt
/* 样例数据
100001 20201231 P0001 1
100002 20201231 P0002 1
100003 20201231 P0003 2
*/
keyOut.set(fields[2]);
value.set("order\t" + value.toString());
}
context.write(keyOut, value);
}
}
复制代码
2, Reducer 逻辑
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class ReduceJoinReducer extends Reducer<Text, Text, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text valueOut = new Text();
String order = "";
String product = "";
//1, 判断value是来自哪个表(Product.txt or Order.txt)
for (Text value : values) {
if (value.toString().startsWith("product")) {
//2, 来源于Product.txt, 则将数据定义header
/* 样例数据
product P0001 华为Mate40Pro c00001 7999
product P0002 苹果Iphone12 c00001 7999
product P0003 小米10 c00001 4999
*/
product += value.toString().replace("product", "");
} else {
//3, 来源于Order.txt, 则将value赋值给content
/* 样例数据
order 100001 20201231 P0001 1
order 100002 20201231 P0002 1
order 100003 20201231 P0003 2
*/
order += value.toString().replace("order", "");
}
}
//4, 输出header:content
valueOut.set(order + "\t" + product);
context.write(NullWritable.get(), valueOut);
}
}
复制代码
3, Job 启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//创建job
Job job = Job.getInstance(super.getConf(), "ReduceJoin_job");
//1, 设置输入文件类, 输入目录
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("D:\\Source\\data\\data_in_reduceJoin"));
//2, 设置Mapper类, Mapper任务是输出格式
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//3, 设置分区; 4,排序; 5,规约; 6,分组
//7, 设置Reducer类, Reducer任务的输出格式
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//8, 设置输出文件类, 输出目录
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("D:\\Source\\data\\data_out_reduceJoin"));
//启动Job
boolean completion = job.waitForCompletion(true);
return completion ? 1 : 0;
}
public static void main(String[] args) {
int run = 0;
try {
run = ToolRunner.run(new Configuration(), new JobMain(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(run);
}
}
复制代码
4, 输出结果
100001 20201231 P0001 1 P0001 华为Mate40Pro c00001 7999
100002 20201231 P0002 1 P0002 苹果Iphone12 c00001 7999
100003 20201231 P0003 2 P0003 小米10 c00001 4999
复制代码
评论