背景
现有大数据平台的 Spark 版本是 2.1.0,Python 版本 2.7 和 3.6 ,通过 PySpark+Jupyter 方式提供服务。Python 2 年岁久远,升级支持 Python 3
遇到的问题
1.平台其他应用程序需要 Python 2,升级不能影响已有的环境;
2.Spark 集群有数百台机器,在每个节点上安装运行环境工作量大且出错概率高;
处理过程
Spark 2.1.0 最高支持的 Python 版本是 Python 3.5(Issue),不能使用已有的 3.6,PySpark 的 API 不兼容。
下载 Python 3.5 源码,指定路径编译安装
./configure --prefix='/usr/local/python3.5' --enable-optimizations && make && make install
复制代码
Python 装好后安装 Jupyter 内核和第三方依赖库
curl https://bootstrap.pypa.io/3.5/get-pip.py -o get-pip.py
/usr/local/python3.5/bin/python3.5 get-pip.py
/usr/local/python3.5/bin/pip3 install ipykernel
复制代码
打包依赖,上传的分布式文件系统
zip python35.zip -rq /usr/local/python3.5
hadoop fs -put python35.zip /tmp
复制代码
编写测试脚本
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('PI').enableHiveSupport().getOrCreate()
num_samples = 100000000
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
count = spark.sparkContext.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print('PI:',pi)
复制代码
提交 Spark 任务,验证是否正常
bin/spark-submit --name pyspark --num-executors 4 \
--master yarn --deploy-mode client \
--conf spark.pyspark.python=./python35/python3.5/bin/python3 \
--conf spark.pyspark.driver.python=/usr/local/python3.5/bin/python3.5 \
--conf spark.yarn.dist.archives=hdfs://hdfsCluster/tmp/python35.zip#python35 \
test.py
复制代码
注意这里 PYSPARK_PYTHON 变量中 python35 是 hdfs 上 zip 包解压后的路径。若上述命令能看到输出 PI 的结果,则 Spark 已可用。
现有平台的 Notebook 是定制过的,使用 pyspark-shell 的内核。
由shell.py的源码可看到,pyspark-shell 其实是在启动时初始化了一些环境,如 spark,sql,sc 等,
这里增加创建 Session 的参数即可
spark = SparkSession.builder\
.enableHiveSupport()\
.config('spark.yarn.dist.archives','hdfs://hdfsCluster/tmp/python35.zip#python35')\
.config('spark.submit.deployMode','client')\
.getOrCreate()
复制代码
修改内核文件 kernel.json,指定环境变量和 Python 路径
{
"display_name": "pySpark",
"language": "python",
"argv": [
"/usr/local/python3.5/bin/python3",
"-m",
"ipykernel_launcher",
"-f",
"{connection_file}"
],
"env": {
"PYSPARK_PYTHON": "./python35/python3.5/bin/python3",
"PYSPARK_DRIVER_PYTHON": "/usr/local/python3.5/bin/python3",
}
}
复制代码
重启 Notebook 生效
总结思考
上面通过 archive 的方式解决了运行环境分发的问题,避免了在每个 worker 节点机械部署的操作。
archive 的方式也有不足,即计算时每个 Executor 均需要从分布式文件系统下载环境,增加了网络 IO 等开销,开启动态资源伸缩功能时消耗更多;当然总体上还是利大于弊。
这里的 archive 其实是制作运行时的过程,类似容器技术的镜像,自然地可以想到用 Docker 的方式来处理。在没有历史包袱的场景,该问题用 K8s 可以轻松得到解决,并且 Jupyter 多租、Spark 调度均有很好的支持。
从解耦角度上,Jupyter 使用标准的 Python 内核会更合适,PySpark 仅作为 Spark 的 Client 库,交由用户创建 Session,也减轻了平台的负担。比如这个场景中,若 Python 库有更新需要打包,则所有用户都会感受到,多数时候不一定是好事。
评论