Golang 在处理 10 亿行数据集的挑战中展现了高效的并发处理和优化的 I/O 操作能力,通过使用 Parquet 二进制格式,进一步提升了数据处理性能,并最终将处理时间从 15 分钟优化到了 5 秒。原文: Go One Billion Row Challenge — From 15 Minutes to 5 Seconds
10 亿行挑战在编程界引起了广泛关注,其主要目的是测试不同编程语言处理并汇总包含 10 亿行的海量数据集的速度,而以性能和并发能力著称的 Go 是有力竞争者。目前性能最好的 Java 实现处理数据的时间仅为 1.535 秒,我们看看 Go 的表现如何。
本文将基于 Go 特有的功能进行优化。注:所有基准数据都是在多次运行后计算得出的。
硬件设置
在配备 M2 Pro 芯片的 16 英寸 MacBook Pro 上进行了所有测试,该芯片拥有 12 个 CPU 核和 36 GB 内存。不同环境运行的测试结果可能因硬件而异,但相对性能差异应该差不多。
什么是 "10 亿行挑战"?
挑战很简单:处理包含 10 亿个任意温度测量值的文本文件,并计算每个站点的汇总统计数据(最小值、平均值和最大值)。问题在于如何高效处理如此庞大的数据集。
数据集通过代码仓库中的 createMeasurements.go
脚本生成。运行脚本后,将得到一个 12.8 GB 大的由分号分隔的文本文件(measurements.txt
),包含两列数据:站点名称和测量值。
我们需要处理该文件,并输出以下结果:
{Station1=12.3/25.6/38.9, Station2=10.0/22.5/35.0, ...}
复制代码
我们看看如何实现这一目标。
Go 初始实现
单核版本
我们从基本的单线程实现开始。该版本逐行读取文件,解析数据,并更新映射以跟踪统计数据。
package main
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
statsMap := make(map[string]*Stats)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Print("{")
for station, stat := range statsMap {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
复制代码
多核版本
为了充分利用多个 CPU 核,我们把文件分成若干块,然后利用 Goroutine 和 Channel 并行处理。
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func worker(lines []string, wg *sync.WaitGroup, statsChan chan map[string]*Stats) {
defer wg.Done()
statsMap := make(map[string]*Stats)
for _, line := range lines {
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
statsChan <- statsMap
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
numCPU := runtime.NumCPU()
linesPerWorker := 1000000
scanner := bufio.NewScanner(file)
lines := make([]string, 0, linesPerWorker)
var wg sync.WaitGroup
statsChan := make(chan map[string]*Stats, numCPU)
go func() {
wg.Wait()
close(statsChan)
}()
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) >= linesPerWorker {
wg.Add(1)
go worker(lines, &wg, statsChan)
lines = make([]string, 0, linesPerWorker)
}
}
if len(lines) > 0 {
wg.Add(1)
go worker(lines, &wg, statsChan)
}
if err := scanner.Err(); err != nil {
panic(err)
}
finalStats := make(map[string]*Stats)
for partialStats := range statsChan {
for station, stat := range partialStats {
existingStat, exists := finalStats[station]
if !exists {
finalStats[station] = stat
} else {
if stat.Min < existingStat.Min {
existingStat.Min = stat.Min
}
if stat.Max > existingStat.Max {
existingStat.Max = stat.Max
}
existingStat.Sum += stat.Sum
existingStat.Count += stat.Count
}
}
}
fmt.Print("{")
for station, stat := range finalStats {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
复制代码
Go 实现结果
单核和多核版本的运行结果分别如下:
单核版本:15 分 30 秒
多核版本:6 分 45 秒
虽然多核版本有明显改善,但处理数据仍然花了好几分钟。下面看看如何进一步优化。
利用 Go 的并发和缓冲 I/O 进行优化
为了提高性能,我们考虑利用缓冲 I/O,并优化 Goroutine。
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
)
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func worker(id int, jobs <-chan []string, results chan<- map[string]*Stats, wg *sync.WaitGroup) {
defer wg.Done()
for lines := range jobs {
statsMap := make(map[string]*Stats)
for _, line := range lines {
parts := strings.Split(line, ";")
if len(parts) != 2 {
continue
}
station := parts[0]
measurement, err := strconv.ParseFloat(parts[1], 64)
if err != nil {
continue
}
stat, exists := statsMap[station]
if !exists {
statsMap[station] = &Stats{
Min: measurement,
Max: measurement,
Sum: measurement,
Count: 1,
}
} else {
if measurement < stat.Min {
stat.Min = measurement
}
if measurement > stat.Max {
stat.Max = measurement
}
stat.Sum += measurement
stat.Count++
}
}
results <- statsMap
}
}
func processFile(filename string) {
file, err := os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
numCPU := runtime.NumCPU()
jobs := make(chan []string, numCPU)
results := make(chan map[string]*Stats, numCPU)
var wg sync.WaitGroup
for i := 0; i < numCPU; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
go func() {
wg.Wait()
close(results)
}()
scanner := bufio.NewScanner(file)
bufferSize := 1000000
lines := make([]string, 0, bufferSize)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) >= bufferSize {
jobs <- lines
lines = make([]string, 0, bufferSize)
}
}
if len(lines) > 0 {
jobs <- lines
}
close(jobs)
if err := scanner.Err(); err != nil {
panic(err)
}
finalStats := make(map[string]*Stats)
for partialStats := range results {
for station, stat := range partialStats {
existingStat, exists := finalStats[station]
if !exists {
finalStats[station] = stat
} else {
if stat.Min < existingStat.Min {
existingStat.Min = stat.Min
}
if stat.Max > existingStat.Max {
existingStat.Max = stat.Max
}
existingStat.Sum += stat.Sum
existingStat.Count += stat.Count
}
}
}
fmt.Print("{")
for station, stat := range finalStats {
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processFile("data/measurements.txt")
}
复制代码
优化 Go 实现结果
优化后,处理时间大大缩短:
确实获得了实质性改进,但仍然无法与最快的 Java 实现相媲美。
利用高效数据格式
由于文本文件不是处理大型数据集的最有效方式,我们考虑将数据转换为 Parquet 等二进制格式来提高效率。
转换为 Parquet
可以用 Apache Arrow 等工具将文本文件转换为 Parquet 文件。为简单起见,假设数据已转换为 measurements.parquet
。
用 Go 处理 Parquet 文件
我们用 parquet-go
库来读取 Parquet 文件。
package main
import (
"fmt"
"log"
"sort"
"github.com/xitongsys/parquet-go/reader"
"github.com/xitongsys/parquet-go/source/local"
)
type Measurement struct {
StationName string `parquet:"name=station_name, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Measurement float64 `parquet:"name=measurement, type=DOUBLE"`
}
type Stats struct {
Min float64
Max float64
Sum float64
Count int64
}
func processParquetFile(filename string) {
fr, err := local.NewLocalFileReader(filename)
if err != nil {
log.Fatal(err)
}
defer fr.Close()
pr, err := reader.NewParquetReader(fr, new(Measurement), 4)
if err != nil {
log.Fatal(err)
}
defer pr.ReadStop()
num := int(pr.GetNumRows())
statsMap := make(map[string]*Stats)
for i := 0; i < num; i += 1000000 {
readNum := 1000000
if i+readNum > num {
readNum = num - i
}
measurements := make([]Measurement, readNum)
if err = pr.Read(&measurements); err != nil {
log.Fatal(err)
}
for _, m := range measurements {
stat, exists := statsMap[m.StationName]
if !exists {
statsMap[m.StationName] = &Stats{
Min: m.Measurement,
Max: m.Measurement,
Sum: m.Measurement,
Count: 1,
}
} else {
if m.Measurement < stat.Min {
stat.Min = m.Measurement
}
if m.Measurement > stat.Max {
stat.Max = m.Measurement
}
stat.Sum += m.Measurement
stat.Count++
}
}
}
stationNames := make([]string, 0, len(statsMap))
for station := range statsMap {
stationNames = append(stationNames, station)
}
sort.Strings(stationNames)
fmt.Print("{")
for _, station := range stationNames {
stat := statsMap[station]
mean := stat.Sum / float64(stat.Count)
fmt.Printf("%s=%.1f/%.1f/%.1f, ", station, stat.Min, mean, stat.Max)
}
fmt.Print("\b\b} \n")
}
func main() {
processParquetFile("data/measurements.parquet")
}
复制代码
Parquet 处理结果
通过以 Parquet 格式处理数据,取得了显著的性能提升:
Go 的性能更进一步接近了最快的 Java 实现。
结论
Go 在 10 亿行挑战中表现出色。通过利用 Go 的并发模型和优化 I/O 操作,大大缩短了处理时间。通过将数据集转换为二进制格式(如 Parquet)可进一步提高性能。
主要收获:
最终想法
尽管 Go 可能无法取代速度最快的 Java 实现,但在高效处理大数据方面展示了令人印象深刻的能力。工具的选择和优化可以缩小性能差距,使 Go 成为数据密集型任务的可行选择。
你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!
评论