GroupingComparator 分组
[](()GroupingComparator 分组介绍
[](()概念
GroupingComparator 分组用来辅助排序,对 Reduce 阶段的数据根据某一个或几个字段进行分组。
[](()分组排序步骤
(1)自定义类继承 WritableComparator
(2)重写 compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
(3)创建一个构造将比较对象的类传给父类
[](()GroupingComparator 分组案例
[](()需求
[](()1. 需求说明
求出表中每一个订单中最贵的商品
[](()2. 文件
[](()案例分析
[](()1. 需求分析
(1)利用“订单 id 和成交金额”作为 key,可以将 Map 阶段读取到的所有订单数据按照 id 升序排序,如果 id 相同再按照金额降序排序,发送到 Reduce。
(2)在 Reduce 端利用 groupingComparator 将订单 id 相同的 kv 聚合成组,然后取第一个即是该订单中最贵商品
[](()2. 输入数据
[](()3. 期望输出数据
[](()4. Map 阶段
(1)Map 阶段处理的事情
获取一行
切割出每个字段
一行封装成一个 bean
(2)二次排序
先根据订单 id 排序
Id 相同再根据价格降序排序
[](()5. Ruduce 阶段
辅助排序
对从 map 阶段拉过的数据再次进行排序,只要 id 相同就认为是相同的 key
Reduce 方法只把一组的 key 的第一个写进去
[](()代码实现
[](()1. 定义订单信息 orderBean 类
package com.atguigu.mr.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;
import org.apache.hadoop.io.WritableComparable;
public class OrderBean implements WritableComparable<OrderBean>{
private int order_id; //订单 id
private double price; //订单价格
public OrderBean() {
super();
}
public OrderBean(int order_id, double price) {
super();
this.order_id = order_id;
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_i 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 d);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
order_id = in.readInt();
price = in.readDouble();
}
@Override
public int compareTo(OrderBean bean) {
//1 先按照 id 升序排序,再按照价格排序
int result;
if (order_id > bean.getOrder_id() ) {
result = 1;
}else if (order_id < bean.getOrder_id() ) {
result = -1;
}else {
if (price < bean.getPrice()) {
result = 1;
}else if (price > bean.getPrice()) {
result = -1;
}else {
result = 0;
}
}
return result;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public String toString() {
return order_id + "\t"+ price;
}
}
[](()2. 编写 OrderSortMapper 类
package com.atguigu.mr.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean k = new OrderBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//切割
String[] fields = line.split("\t");
//封装对象
k.setOrder_id(Integer.parseInt(fields[0]));
k.setPrice(Double.parseDouble(fields[2]));
//写出
context.write(k, NullWritable.get());
}
}
评论