大数据 -88 Spark Super Word Count 全流程实现(Scala + MySQL)

点一下关注吧!!!非常感谢!!持续更新!!!
🚀 AI 篇持续更新中!(长期更新)
AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖
💻 Java 篇正式开启!(300 篇)
目前 2025 年 09 月 01 日更新到:Java-113 深入浅出 MySQL 扩容全攻略:触发条件、迁移方案与性能优化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!
📊 大数据板块已完成多项干货更新(300 篇):
包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解

章节内容
上节完成的内容如下:
Spark 案例编写 Scala
计算圆周率
找共同的好友

Super Word Count
需求背景详细说明
文本预处理任务分解
大小写转换:将输入文本中的所有字母统一转换为小写形式,确保后续处理不受大小写影响。例如:"Hello World" → "hello world"
标点符号处理:
默认去除常见标点符号:! " # $ % & ' ( ) * + , - . / : ; < = > ? @ [ \ ] ^ _ ` { | } ~
支持自定义标点符号集合,用户可传入特定符号列表进行过滤
示例:输入"Hello, world!" → 处理后"hello world"
停用词过滤:
默认使用英文常见停用词列表(如 NLTK 的停用词集)
支持用户自定义停用词列表,可灵活添加业务相关停用词
示例:输入"the quick brown fox" → 过滤后"quick brown fox"(假设"the"在停用词表中)
词频统计:
对处理后的单词进行计数
按词频降序排列结果
示例:输入"apple banana apple" → 输出[('apple',2), ('banana',1)]
数据存储:
结果保存到 MySQL 数据库
需要设计包含以下字段的表:
id(主键)
word(单词)
count(词频)
create_time(记录创建时间)
支持批量插入和更新操作
技术实现要点
提供配置接口允许用户自定义:
punctuation_set
: 标点符号集合stop_words
: 停用词列表实现高效的正则表达式处理标点符号
使用字典结构进行词频统计优化性能
MySQL 连接使用连接池管理
添加异常处理机制确保数据完整性
典型应用场景
舆情分析系统中的关键词提取
搜索引擎的索引构建
文本分类任务的前期处理
用户评论的词云生成
日志分析中的高频词统计
扩展功能考虑
支持处理多语言文本(如中文分词)
添加词干提取(stemming)或词形还原(lemmatization)选项
实现定时任务自动处理新增文本
增加可视化统计结果导出功能
编写代码
先实现到 MySQL 保存前的内容,我们需要先编写测试一下我们的代码是否正确
详细解释
object SuperWordCount1 { ... }
SuperWordCount1 是一个 Scala 对象,定义了一个单例对象用于运行单词计数程序。
private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\s+")
这里定义了一个 stopWords 列表,包含了常见的停用词,这些词在统计单词频率时会被过滤掉。
split("\s+") 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。
private val punctuation = "[\)\.,:;'!\?]"
定义了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。
def main(args: Array[String]): Unit = { ... }
main 方法是程序的入口点,args 是命令行参数,其中 args(0) 通常表示输入文件的路径。
val conf = new SparkConf().setAppName("ScalaSuperWordCount1").setMaster("local[*]")
SparkConf() 用于配置 Spark 应用程序。setAppName("ScalaSuperWordCount1") 设置应用程序的名称。
setMaster("local[*]") 指定应用程序以本地模式运行,使用所有可用的 CPU 核心。
val sc = new SparkContext(conf)
SparkContext 是 Spark 应用程序的核心,用于与 Spark 集群进行交互。
sc.setLogLevel("WARN")
设置日志级别为 “WARN”,减少日志输出,方便查看重要信息。
val lines: RDD[String] = sc.textFile(args(0))
sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD[String],其中每一行文本都作为一个字符串元素。
lines 是包含输入文本数据的 RDD。
flatMap(_.split("\s+"))
flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。
map(_.toLowerCase)
将每个单词转换为小写,以确保统计时不区分大小写。
map(_.replaceAll(punctuation, ""))
使用正则表达式 punctuation 去除单词中的标点符号,使得统计结果更加准确。
filter(word => !stopWords.contains(word) && word.trim.nonEmpty)
filter 方法过滤掉停用词和空白单词:
!stopWords.contains(word) 确保当前单词不在停用词列表中。
word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。
map((_, 1))
将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。
reduceByKey(_ + _)
reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。
sortBy(_._2, false)
将统计结果按值(单词出现的次数)从大到小排序。
collect().foreach(println)
collect() 方法将 RDD 中的数据收集到驱动程序中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。
由于 collect 会将数据从分布式环境中拉到本地,需要注意数据量大的情况下可能导致内存不足的问题。
sc.stop()
在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。
添加依赖
同时我们需要在 build 的部分,也要加入对应的内容,让驱动可以加载进来:
创建库表
我们新建一个数据库,也要新建一个数据表
写入 SQL-未优化
我们在 foreach 中保存了数据,此时需要创建大量的 MySQL 连接,效率是比较低的。
写入 SQL-优化版
优化后使用 foreachPartition 保存数据,一个分区创建一个链接:cache RDD 注意:
SparkSQL 有方便的读写 MySQL 的方法,给参数直接调用即可
但掌握这个方法很重要,因为 SparkSQL 不是支持所有类型的数据库
打包上传
打包并上传到项目:

运行项目
不写入 SQL 版
运行结果如下图:

写入 SQL-未优化版
写入 SQL-优化版
运行结果如下图:

查看数据
查看数据库,内容如下:

版权声明: 本文为 InfoQ 作者【武子康】的原创文章。
原文链接:【http://xie.infoq.cn/article/d8aa8ad7e09fc5ed38c7cbb90】。文章转载请联系作者。
评论