import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;
public class MapperJoinMapper extends Mapper<LongWritable, Text, NullWritable, Text> { private HashMap<String, String> productMap = new HashMap<String, String>();
// 读取分布式缓存中的小表数据 @Override protected void setup(Context context) throws IOException, InterruptedException { //1, 获取分布式缓存文件列表 URI[] cacheFiles = context.getCacheFiles(); //2, 获取小表所在的文件系统 URI cacheFile = cacheFiles[0]; FileSystem fileSystem = FileSystem.get(cacheFile, context.getConfiguration()); //3, 获取指向小表文件的输入流 FSDataInputStream inputStream = fileSystem.open(new Path(cacheFile)); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); //4, 读取文件的每一行内容, 保存数据到全局变量Map集合<id,行内容> String line = null; while ((line = reader.readLine()) != null) { String[] product = line.split("\t"); productMap.put(product[0], line); } //5, 关闭资源 reader.close(); fileSystem.close(); }
// 读取大表数据, 并与缓存中读取到的小表数据做join操作 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1,在代表大表Order的行文本中获取到商品id, 作为key String[] fields = value.toString().split("\t"); String productID = fields[2]; //2, 在小表Map集合中获取到商品id对应的值, 作为value String productInfor = productMap.get(productID); //3, 将key,value写入到上下文中 context.write(NullWritable.get(), new Text(productInfor + "\t" + value.toString())); }}
评论