写点什么

MapReduce 高阶 分区、排序,Combine,Sh

用户头像
binfirechen
关注
发布于: 1 小时前

一、分区

1.1先分析一下具体的业务逻辑,确定大概有多少个分区1.2首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类1.3重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字1.4在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);1.5设置Reducer的数量,job.setNumReduceTasks(6);
复制代码


public static void main(String[] args) throws Exception {		Configuration conf = new Configuration();				Job job = Job.getInstance(conf);				job.setJarByClass(DataCount.class);				job.setMapperClass(DCMapper.class);		job.setMapOutputKeyClass(Text.class);		job.setMapOutputValueClass(DataInfo.class);				job.setReducerClass(DCReducer.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(DataInfo.class);				FileInputFormat.setInputPaths(job, new Path(args[0]));				FileOutputFormat.setOutputPath(job, new Path(args[1]));				job.setPartitionerClass(DCPartitioner.class);				job.setNumReduceTasks(Integer.parseInt(args[2]));						job.waitForCompletion(true);
} //Map public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{ private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataInfo>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String tel = fields[1]; long up = Long.parseLong(fields[8]); long down = Long.parseLong(fields[9]); DataInfo dataInfo = new DataInfo(tel,up,down); k.set(tel); context.write(k, dataInfo);
} } public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{ @Override protected void reduce(Text key, Iterable<DataInfo> values, Reducer<Text, DataInfo, Text, DataInfo>.Context context) throws IOException, InterruptedException { long up_sum = 0; long down_sum = 0; for(DataInfo d : values){ up_sum += d.getUpPayLoad(); down_sum += d.getDownPayLoad(); } DataInfo dataInfo = new DataInfo("",up_sum,down_sum); context.write(key, dataInfo); } } public static class DCPartitioner extends Partitioner<Text, DataInfo>{ private static Map<String,Integer> provider = new HashMap<String,Integer>(); static{ provider.put("138", 1); provider.put("139", 1); provider.put("152", 2); provider.put("153", 2); provider.put("182", 3); provider.put("183", 3); } @Override public int getPartition(Text key, DataInfo value, int numPartitions) { //向数据库或配置信息 读写 String tel_sub = key.toString().substring(0,3); Integer count = provider.get(tel_sub); if(count == null){ count = 0; } return count; } }
复制代码


二、排序

排序 MR 默认是按 key2 进行排序的,如果想自定义排序规则,被排序的对象要实 WritableComparable 接口,在 compareTo 方法中实现排序规则,然后将这个对象当做 k2,即可完成排序


public class InfoBean implements WritableComparable<InfoBean>{
private String account; private double income; private double expenses; private double surplus; public void set(String account,double income,double expenses){ this.account = account; this.income = income; this.expenses = expenses; this.surplus = income - expenses; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(income); out.writeDouble(expenses); out.writeDouble(surplus); }
@Override public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.income = in.readDouble(); this.expenses = in.readDouble(); this.surplus = in.readDouble(); }
@Override public int compareTo(InfoBean o) { if(this.income == o.getIncome()){ return this.expenses > o.getExpenses() ? 1 : -1; } return this.income > o.getIncome() ? 1 : -1; }
@Override public String toString() { return income + "\t" + expenses + "\t" + surplus; } public String getAccount() { return account; }
public void setAccount(String account) { this.account = account; }
public double getIncome() { return income; }
public void setIncome(double income) { this.income = income; }
public double getExpenses() { return expenses; }
public void setExpenses(double expenses) { this.expenses = expenses; }
public double getSurplus() { return surplus; }
public void setSurplus(double surplus) { this.surplus = surplus; }
}
public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean k = new InfoBean(); @Override protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); k.set(fields[0], Double.parseDouble(fields[1]), Double.parseDouble(fields[2])); context.write(k, NullWritable.get()); } } public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k = new Text(); @Override protected void reduce(InfoBean key, Iterable<NullWritable> values, Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context) throws IOException, InterruptedException { k.set(key.getAccount()); context.write(k, key); } }
复制代码


三、Combine

combiner 的作用就是在 map 端对输出先做一次合并,以减少传输到 reducer 的数据量。

  job.setCombinerClass(WCReducer.class);        //提交任务  job.waitForCompletion(true);
复制代码


用户头像

binfirechen

关注

还未添加个人签名 2020.11.27 加入

85普通大学生的IT之路

评论

发布
暂无评论
MapReduce高阶  分区、排序,Combine,Sh