写点什么

大数据知识专栏 - MapReduce 的自定义分组求 TopN

用户头像
小马哥
关注
发布于: 2021 年 01 月 28 日
大数据知识专栏 - MapReduce 的自定义分组求TopN

自定义分组求取 topN


分组是 mapreduce 当中 reduce 端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次 reduce 的逻辑,默认是每个不同的 key,作为多个不同的组,每个组调用一次 reduce 逻辑,我们可以自定义分组实现不同的 key 作为同一个组,调用一次 reduce 逻辑


*3.1 需求*


有如下订单数据


| 订单 id | 商品 id | 成交金额 |

| ------------- | ------ | -------- |

| Order0000001 | Pdt01 | 222.8 |

| Order0000001 | Pdt05 | 25.8 |

| Order0000002 | Pdt03 | 522.8 |

| Order0000002 | Pdt04 | 122.4 |

| Order0000002 | Pdt05 | 722.4 |

| Order0000003 | Pdt01 | 222.8 |


现在需要求出每一个订单中成交金额最大的一笔交易


*3.2 分析*


1、利用“订单 id 和成交金额”作为 key,可以将 map 阶段读取到的所有订单数据按照 id 分区,按照金额排序,发送到 reduce


2、在 reduce 端利用分组将订单 id 相同的 kv 聚合成组,然后取第一个即是最大值


*3.3 实现*


*第一步:*定义 OrderBean


定义一个 OrderBean,里面定义两个字段,第一个字段是我们的 orderId,第二个字段是我们的金额(注意金额一定要使用 Double 或者 DoubleWritable 类型,否则没法按照金额顺序排序)


~~~java

public class OrderBean implements WritableComparable<OrderBean>{

private String orderId;

private Double price;


public String getOrderId() {

return orderId;

}


public void setOrderId(String orderId) {

this.orderId = orderId;

}


public Double getPrice() {

return price;

}


public void setPrice(Double price) {

this.price = price;

}


@Override

public String toString() {

return orderId + "\t" + price;

}


//指定排序规则

@Override

public int compareTo(OrderBean orderBean) {

//先比较订单 ID,如果订单 ID 一致,则排序订单金额(降序)

int i = this.orderId.compareTo(orderBean.orderId);

if(i == 0){

i = this.price.compareTo(orderBean.price) * -1;

}

return i;

}


//实现对象的序列化

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(orderId);

out.writeDouble(price);

}


//实现对象反序列化

@Override

public void readFields(DataInput in) throws IOException {

this.orderId = in.readUTF();

this.price = in.readDouble();

}

}

~~~


第二步: 定义 Mapper 类


~~~java

public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,Text> {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1:拆分行文本数据,得到订单的 ID,订单的金额

String[] split = value.toString().split("\t");


//2:封装 OrderBean,得到 K2

OrderBean orderBean = new OrderBean();

orderBean.setOrderId(split[0]);

orderBean.setPrice(Double.valueOf(split[2]));


//3:将 K2 和 V2 写入上下文中

context.write(orderBean, value);

}

}

~~~


*第三步:*自定义分区


自定义分区,按照订单 id 进行分区,把所有订单 id 相同的数据,都发送到同一个 reduce 中去


~~~java

public class OrderPartition extends Partitioner<OrderBean,Text> {

//分区规则: 根据订单的 ID 实现分区


/**

*

* @param orderBean K2

* @param text V2

* @param i ReduceTask 个数

* @return 返回分区的编号

*/

@Override

public int getPartition(OrderBean orderBean, Text text, int i) {

return (orderBean.getOrderId().hashCode() & 2147483647) % i;

}

}


~~~


*第四步:*自定义分组


按照我们自己的逻辑进行分组,通过比较相同的订单 id,将相同的订单 id 放到一个组里面去,进过分组之后当中的数据,已经全部是排好序的数据,我们只需要取前 topN 即可


~~~java

// 1: 继承 WriteableComparator

public class OrderGroupComparator extends WritableComparator {

// 2: 调用父类的有参构造

public OrderGroupComparator() {

super(OrderBean.class,true);

}


//3: 指定分组的规则(重写方法)

@Override

public int compare(WritableComparable a, WritableComparable b) {

//3.1 对形参做强制类型转换

OrderBean first = (OrderBean)a;

OrderBean second = (OrderBean)b;


//3.2 指定分组规则

return first.getOrderId().compareTo(second.getOrderId());

}

}

~~~


第五步:定义 Reducer 类


~~~java

public class GroupReducer extends Reducer<OrderBean,Text,Text,NullWritable> {

@Override

protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

int i = 0;

//获取集合中的前 N 条数据

for (Text value : values) {

context.write(value, NullWritable.get());

i++;

if(i >= 1){

break;

}

}

}

}

~~~


*第六步:*程序 main 函数入口


~~~java

public class JobMain extends Configured implements Tool {

@Override

public int run(String[] args) throws Exception {

//1:获取 Job 对象

Job job = Job.getInstance(super.getConf(), "mygroup_job");


//2:设置 job 任务

//第一步:设置输入类和输入路径

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\mygroup_input"));


//第二步:设置 Mapper 类和数据类型

job.setMapperClass(GroupMapper.class);

job.setMapOutputKeyClass(OrderBean.class);

job.setMapOutputValueClass(Text.class);


//第三,四,五,六

//设置分区

job.setPartitionerClass(OrderPartition.class);

//设置分组

job.setGroupingComparatorClass(OrderGroupComparator.class);


//第七步:设置 Reducer 类和数据类型

job.setReducerClass(GroupReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);


//第八步:设置输出类和输出的路径

job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\mygroup_out"));


//3:等待 job 任务结束

boolean bl = job.waitForCompletion(true);


return bl ? 0: 1;

}


public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();


//启动 job 任务

int run = ToolRunner.run(configuration, new JobMain(), args);


System.exit(run);

}

}


~~~


发布于: 2021 年 01 月 28 日阅读数: 21
用户头像

小马哥

关注

自强不息,厚德载物 2018.12.22 加入

像一棵竹子那样, 不断的扎根积累, 活出节节高的人生!

评论 (1 条评论)

发布
用户头像
拷贝MD格式的文档, 糊了. 平台能否解决?
2021 年 01 月 29 日 00:22
回复
没有更多了
大数据知识专栏 - MapReduce 的自定义分组求TopN