import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class MapperJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
private HashMap<String, String> productMap = new HashMap<String, String>();
// 读取分布式缓存中的小表数据
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1, 获取分布式缓存文件列表
URI[] cacheFiles = context.getCacheFiles();
//2, 获取小表所在的文件系统
URI cacheFile = cacheFiles[0];
FileSystem fileSystem = FileSystem.get(cacheFile, context.getConfiguration());
//3, 获取指向小表文件的输入流
FSDataInputStream inputStream = fileSystem.open(new Path(cacheFile));
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
//4, 读取文件的每一行内容, 保存数据到全局变量Map集合<id,行内容>
String line = null;
while ((line = reader.readLine()) != null) {
String[] product = line.split("\t");
productMap.put(product[0], line);
}
//5, 关闭资源
reader.close();
fileSystem.close();
}
// 读取大表数据, 并与缓存中读取到的小表数据做join操作
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1,在代表大表Order的行文本中获取到商品id, 作为key
String[] fields = value.toString().split("\t");
String productID = fields[2];
//2, 在小表Map集合中获取到商品id对应的值, 作为value
String productInfor = productMap.get(productID);
//3, 将key,value写入到上下文中
context.write(NullWritable.get(), new Text(productInfor + "\t" + value.toString()));
}
}
评论