写点什么

10 亿行数据集处理挑战:从 15 分钟到 5 秒

作者:俞凡
  • 2024-11-25
    上海
  • 本文字数:6506 字

    阅读完需:约 21 分钟

Golang 在处理 10 亿行数据集的挑战中展现了高效的并发处理和优化的 I/O 操作能力,通过使用 Parquet 二进制格式,进一步提升了数据处理性能,并最终将处理时间从 15 分钟优化到了 5 秒。原文: Go One Billion Row Challenge — From 15 Minutes to 5 Seconds



10 亿行挑战在编程界引起了广泛关注,其主要目的是测试不同编程语言处理并汇总包含 10 亿行的海量数据集的速度,而以性能和并发能力著称的 Go 是有力竞争者。目前性能最好的 Java 实现处理数据的时间仅为 1.535 秒,我们看看 Go 的表现如何。


本文将基于 Go 特有的功能进行优化。注:所有基准数据都是在多次运行后计算得出的。


硬件设置


在配备 M2 Pro 芯片的 16 英寸 MacBook Pro 上进行了所有测试,该芯片拥有 12 个 CPU 核和 36 GB 内存。不同环境运行的测试结果可能因硬件而异,但相对性能差异应该差不多。

什么是 "10 亿行挑战"?

挑战很简单:处理包含 10 亿个任意温度测量值的文本文件,并计算每个站点的汇总统计数据(最小值、平均值和最大值)。问题在于如何高效处理如此庞大的数据集。


数据集通过代码仓库中的 createMeasurements.go 脚本生成。运行脚本后,将得到一个 12.8 GB 大的由分号分隔的文本文件(measurements.txt),包含两列数据:站点名称和测量值。


我们需要处理该文件,并输出以下结果:


{Station1=12.3/25.6/38.9, Station2=10.0/22.5/35.0, ...}
复制代码


我们看看如何实现这一目标。

Go 初始实现

单核版本

我们从基本的单线程实现开始。该版本逐行读取文件,解析数据,并更新映射以跟踪统计数据。


package main
import ( "bufio" "fmt" "os" "strconv" "strings")
type Stats struct { Min float64 Max float64 Sum float64 Count int64}
func processFile(filename string) { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close()
statsMap := make(map[string]*Stats) scanner := bufio.NewScanner(file)
for scanner.Scan() { line := scanner.Text() parts := strings.Split(line, ";") if len(parts) != 2 { continue }
station := parts[0] measurement, err := strconv.ParseFloat(parts[1], 64) if err != nil { continue }
stat, exists := statsMap[station] if !exists { statsMap[station] = &Stats{ Min: measurement, Max: measurement, Sum: measurement, Count: 1, } } else { if measurement < stat.Min { stat.Min = measurement } if measurement > stat.Max { stat.Max = measurement } stat.Sum += measurement stat.Count++ } }
if err := scanner.Err(); err != nil { panic(err) }
fmt.Print("{") for station, stat := range statsMap { mean := stat.Sum / float64(stat.Count) fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max) } fmt.Print("\b\b} \n")}
func main() { processFile("data/measurements.txt")}
复制代码
多核版本

为了充分利用多个 CPU 核,我们把文件分成若干块,然后利用 GoroutineChannel 并行处理。


package main
import ( "bufio" "fmt" "os" "runtime" "strconv" "strings" "sync")
type Stats struct { Min float64 Max float64 Sum float64 Count int64}
func worker(lines []string, wg *sync.WaitGroup, statsChan chan map[string]*Stats) { defer wg.Done() statsMap := make(map[string]*Stats)
for _, line := range lines { parts := strings.Split(line, ";") if len(parts) != 2 { continue }
station := parts[0] measurement, err := strconv.ParseFloat(parts[1], 64) if err != nil { continue }
stat, exists := statsMap[station] if !exists { statsMap[station] = &Stats{ Min: measurement, Max: measurement, Sum: measurement, Count: 1, } } else { if measurement < stat.Min { stat.Min = measurement } if measurement > stat.Max { stat.Max = measurement } stat.Sum += measurement stat.Count++ } }
statsChan <- statsMap}
func processFile(filename string) { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close()
numCPU := runtime.NumCPU() linesPerWorker := 1000000 scanner := bufio.NewScanner(file) lines := make([]string, 0, linesPerWorker)
var wg sync.WaitGroup statsChan := make(chan map[string]*Stats, numCPU)
go func() { wg.Wait() close(statsChan) }()
for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) >= linesPerWorker { wg.Add(1) go worker(lines, &wg, statsChan) lines = make([]string, 0, linesPerWorker) } }
if len(lines) > 0 { wg.Add(1) go worker(lines, &wg, statsChan) }
if err := scanner.Err(); err != nil { panic(err) }
finalStats := make(map[string]*Stats) for partialStats := range statsChan { for station, stat := range partialStats { existingStat, exists := finalStats[station] if !exists { finalStats[station] = stat } else { if stat.Min < existingStat.Min { existingStat.Min = stat.Min } if stat.Max > existingStat.Max { existingStat.Max = stat.Max } existingStat.Sum += stat.Sum existingStat.Count += stat.Count } } }
fmt.Print("{") for station, stat := range finalStats { mean := stat.Sum / float64(stat.Count) fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max) } fmt.Print("\b\b} \n")}
func main() { processFile("data/measurements.txt")}
复制代码
Go 实现结果

单核和多核版本的运行结果分别如下:


  • 单核版本:15 分 30 秒

  • 多核版本:6 分 45 秒


虽然多核版本有明显改善,但处理数据仍然花了好几分钟。下面看看如何进一步优化。

利用 Go 的并发和缓冲 I/O 进行优化

为了提高性能,我们考虑利用缓冲 I/O,并优化 Goroutine。


package main
import ( "bufio" "fmt" "os" "runtime" "strconv" "strings" "sync")
type Stats struct { Min float64 Max float64 Sum float64 Count int64}
func worker(id int, jobs <-chan []string, results chan<- map[string]*Stats, wg *sync.WaitGroup) { defer wg.Done() for lines := range jobs { statsMap := make(map[string]*Stats) for _, line := range lines { parts := strings.Split(line, ";") if len(parts) != 2 { continue }
station := parts[0] measurement, err := strconv.ParseFloat(parts[1], 64) if err != nil { continue }
stat, exists := statsMap[station] if !exists { statsMap[station] = &Stats{ Min: measurement, Max: measurement, Sum: measurement, Count: 1, } } else { if measurement < stat.Min { stat.Min = measurement } if measurement > stat.Max { stat.Max = measurement } stat.Sum += measurement stat.Count++ } } results <- statsMap }}
func processFile(filename string) { file, err := os.Open(filename) if err != nil { panic(err) } defer file.Close()
numCPU := runtime.NumCPU() jobs := make(chan []string, numCPU) results := make(chan map[string]*Stats, numCPU)
var wg sync.WaitGroup for i := 0; i < numCPU; i++ { wg.Add(1) go worker(i, jobs, results, &wg) }
go func() { wg.Wait() close(results) }()
scanner := bufio.NewScanner(file) bufferSize := 1000000 lines := make([]string, 0, bufferSize)
for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) >= bufferSize { jobs <- lines lines = make([]string, 0, bufferSize) } }
if len(lines) > 0 { jobs <- lines } close(jobs)
if err := scanner.Err(); err != nil { panic(err) }
finalStats := make(map[string]*Stats) for partialStats := range results { for station, stat := range partialStats { existingStat, exists := finalStats[station] if !exists { finalStats[station] = stat } else { if stat.Min < existingStat.Min { existingStat.Min = stat.Min } if stat.Max > existingStat.Max { existingStat.Max = stat.Max } existingStat.Sum += stat.Sum existingStat.Count += stat.Count } } }
fmt.Print("{") for station, stat := range finalStats { mean := stat.Sum / float64(stat.Count) fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max) } fmt.Print("\b\b} \n")}
func main() { processFile("data/measurements.txt")}
复制代码
优化 Go 实现结果

优化后,处理时间大大缩短:


  • 多核优化版:3 分 50 秒


确实获得了实质性改进,但仍然无法与最快的 Java 实现相媲美。

利用高效数据格式

由于文本文件不是处理大型数据集的最有效方式,我们考虑将数据转换为 Parquet 等二进制格式来提高效率。

转换为 Parquet


可以用 Apache Arrow 等工具将文本文件转换为 Parquet 文件。为简单起见,假设数据已转换为 measurements.parquet

用 Go 处理 Parquet 文件

我们用 parquet-go 库来读取 Parquet 文件。


package main
import ( "fmt" "log" "sort"
"github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/source/local")
type Measurement struct { StationName string `parquet:"name=station_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` Measurement float64 `parquet:"name=measurement, type=DOUBLE"`}
type Stats struct { Min float64 Max float64 Sum float64 Count int64}
func processParquetFile(filename string) { fr, err := local.NewLocalFileReader(filename) if err != nil { log.Fatal(err) } defer fr.Close()
pr, err := reader.NewParquetReader(fr, new(Measurement), 4) if err != nil { log.Fatal(err) } defer pr.ReadStop()
num := int(pr.GetNumRows()) statsMap := make(map[string]*Stats)
for i := 0; i < num; i += 1000000 { readNum := 1000000 if i+readNum > num { readNum = num - i }
measurements := make([]Measurement, readNum) if err = pr.Read(&measurements); err != nil { log.Fatal(err) }
for _, m := range measurements { stat, exists := statsMap[m.StationName] if !exists { statsMap[m.StationName] = &Stats{ Min: m.Measurement, Max: m.Measurement, Sum: m.Measurement, Count: 1, } } else { if m.Measurement < stat.Min { stat.Min = m.Measurement } if m.Measurement > stat.Max { stat.Max = m.Measurement } stat.Sum += m.Measurement stat.Count++ } } }
stationNames := make([]string, 0, len(statsMap)) for station := range statsMap { stationNames = append(stationNames, station) } sort.Strings(stationNames)
fmt.Print("{") for _, station := range stationNames { stat := statsMap[station] mean := stat.Sum / float64(stat.Count) fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max) } fmt.Print("\b\b} \n")}
func main() { processParquetFile("data/measurements.parquet")}
复制代码
Parquet 处理结果

通过以 Parquet 格式处理数据,取得了显著的性能提升:


  • Parquet 处理时间:5 秒


Go 的性能更进一步接近了最快的 Java 实现。

结论

Go 在 10 亿行挑战中表现出色。通过利用 Go 的并发模型和优化 I/O 操作,大大缩短了处理时间。通过将数据集转换为二进制格式(如 Parquet)可进一步提高性能。


主要收获:


  • Go 高效的并发机制使其适合处理大型数据集。

  • 优化 I/O 和使用缓冲读取可大幅提高性能。

  • 利用 Parquet 等高效数据格式可大大缩短处理时间。


最终想法


尽管 Go 可能无法取代速度最快的 Java 实现,但在高效处理大数据方面展示了令人印象深刻的能力。工具的选择和优化可以缩小性能差距,使 Go 成为数据密集型任务的可行选择。




你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!

发布于: 刚刚阅读数: 4
用户头像

俞凡

关注

公众号:DeepNoMind 2017-10-18 加入

俞凡,Mavenir Systems研发总监,关注高可用架构、高性能服务、5G、人工智能、区块链、DevOps、Agile等。公众号:DeepNoMind

评论

发布
暂无评论
10 亿行数据集处理挑战:从 15 分钟到 5 秒_golang_俞凡_InfoQ写作社区