写点什么

GroupingComparator 分组

  • 2022 年 5 月 05 日
  • 本文字数:1599 字

    阅读完需:约 5 分钟


[](()GroupingComparator 分组介绍



[](()概念

GroupingComparator 分组用来辅助排序,对 Reduce 阶段的数据根据某一个或几个字段进行分组。

[](()分组排序步骤

(1)自定义类继承 WritableComparator


(2)重写 compare()方法


@Override


public int compare(WritableComparable a, WritableComparable b) {


// 比较的业务逻辑


return result;


}


(3)创建一个构造将比较对象的类传给父类


[](()GroupingComparator 分组案例



[](()需求

[](()1. 需求说明

求出表中每一个订单中最贵的商品

[](()2. 文件

[](()案例分析

[](()1. 需求分析

(1)利用“订单 id 和成交金额”作为 key,可以将 Map 阶段读取到的所有订单数据按照 id 升序排序,如果 id 相同再按照金额降序排序,发送到 Reduce。


(2)在 Reduce 端利用 groupingComparator 将订单 id 相同的 kv 聚合成组,然后取第一个即是该订单中最贵商品

[](()2. 输入数据

[](()3. 期望输出数据

[](()4. Map 阶段

(1)Map 阶段处理的事情


  • 获取一行

  • 切割出每个字段

  • 一行封装成一个 bean


(2)二次排序


  • 先根据订单 id 排序

  • Id 相同再根据价格降序排序

[](()5. Ruduce 阶段

辅助排序


  • 对从 map 阶段拉过的数据再次进行排序,只要 id 相同就认为是相同的 key

  • Reduce 方法只把一组的 key 的第一个写进去

[](()代码实现

[](()1. 定义订单信息 orderBean 类

package com.atguigu.mr.order;


import java.io.DataInput;


import java.io.DataOutput;


import java.io.IOException;


import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo.Bean;


import org.apache.hadoop.io.WritableComparable;


public class OrderBean implements WritableComparable<OrderBean>{


private int order_id; //订单 id


private double price; //订单价格


public OrderBean() {


super();


}


public OrderBean(int order_id, double price) {


super();


this.order_id = order_id;


this.price = price;


}


@Override


public void write(DataOutput out) throws IOException {


out.writeInt(order_i 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 d);


out.writeDouble(price);


}


@Override


public void readFields(DataInput in) throws IOException {


order_id = in.readInt();


price = in.readDouble();


}


@Override


public int compareTo(OrderBean bean) {


//1 先按照 id 升序排序,再按照价格排序


int result;


if (order_id > bean.getOrder_id() ) {


result = 1;


}else if (order_id < bean.getOrder_id() ) {


result = -1;


}else {


if (price < bean.getPrice()) {


result = 1;


}else if (price > bean.getPrice()) {


result = -1;


}else {


result = 0;


}


}


return result;


}


public int getOrder_id() {


return order_id;


}


public void setOrder_id(int order_id) {


this.order_id = order_id;


}


public double getPrice() {


return price;


}


public void setPrice(double price) {


this.price = price;


}


@Override


public String toString() {


return order_id + "\t"+ price;


}


}

[](()2. 编写 OrderSortMapper 类

package com.atguigu.mr.order;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;


import org.apache.hadoop.io.NullWritable;


import org.apache.hadoop.io.Text;


import org.apache.hadoop.mapreduce.Mapper;


public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{


OrderBean k = new OrderBean();


@Override


protected void map(LongWritable key, Text value,


Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)


throws IOException, InterruptedException {


//获取一行


String line = value.toString();


//切割


String[] fields = line.split("\t");


//封装对象


k.setOrder_id(Integer.parseInt(fields[0]));


k.setPrice(Double.parseDouble(fields[2]));


//写出


context.write(k, NullWritable.get());


}


}

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
GroupingComparator分组_Java_爱好编程进阶_InfoQ写作社区