摘要:本文将介绍如何在 Spark scala 程序中调用 Python 脚本,Spark java 程序调用的过程也大体相同。
本文分享自华为云社区《【Spark】如何在Spark Scala/Java应用中调用Python脚本》,作者:小兔子 615 。
1.PythonRunner
对于运行与 JVM 上的程序(即 Scala、Java 程序),Spark 提供了 PythonRunner 类。只需要调用 PythonRunner 的 main 方法,就可以在 Scala 或 Java 程序中调用 Python 脚本。在实现上,PythonRunner 基于 py4j ,通过构造 GatewayServer 实例让 python 程序通过本地网络 socket 来与 JVM 通信。
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val localhost = InetAddress.getLoopbackAddress()
val gatewayServer = new py4j.GatewayServer.GatewayServerBuilder()
.authToken(secret)
.javaPort(0)
.javaAddress(localhost)
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
// Wait until the gateway server has started, so that we know which port is it bound to.
// `gatewayServer.start()` will start a new thread and run the server code there, after
// initializing the socket, so the thread started above will end as soon as the server is
// ready to serve connections.
thread.join()
复制代码
在启动 GatewayServer 后,再通过 ProcessBuilder 构造子进程执行 Python 脚本,等待 Python 脚本执行完成后,根据 exitCode 判断是否执行成功,若执行失败则抛出异常,最后关闭 gatewayServer。
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
try {
val process = builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
复制代码
2.调用方法
2.1 调用代码
PythonRunner 的 main 方法中需要传入三个参数:
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
复制代码
具体样例代码如下,scala 样例代码:
package com.huawei.bigdata.spark.examples
import org.apache.spark.deploy.PythonRunner
import org.apache.spark.sql.SparkSession
object RunPythonExample {
def main(args: Array[String]) {
val pyFilePath = args(0)
val pyFiles = args(1)
val spark = SparkSession
.builder()
.appName("RunPythonExample")
.getOrCreate()
runPython(pyFilePath, pyFiles)
spark.stop()
}
def runPython(pyFilePath: String, pyFiles :String) : Unit = {
val inputPath = "-i /input"
val outputPath = "-o /output"
PythonRunner.main(Array(pyFilePath, pyFiles, inputPath, outputPath))
}
}
复制代码
python 样例代码:
#!/usr/bin/env python
# coding: utf-8
import sys
import argparse
argparser = argparse.ArgumentParser(description="ParserMainEntrance")
argparser.add_argument('--input', '-i', help="input path", default=list(), required=True)
argparser.add_argument('--output', '-o', help="output path", default=list(), required=True)
arglist = argparser.parse_args()
def getTargetPath(input_path, output_path):
try:
print("input path: {}".format(input_path))
print("output path: {}".format(output_path))
return True
except Exception as ex:
print("error with: {}".format(ex))
return False
if __name__ == "__main__":
ret = getTargetPath(arglist.input, arglist.output)
if ret:
sys.exit(0)
else:
sys.exit(1)
复制代码
2.2 运行命令
执行 python 脚本需要设置 pythonExec,即执行 python 脚本所使用的执行环境。默认情况下,使用的执行器为 python(Spark 2.4 及以下)或 python3 (Spark 3.0 及以上)。
//Spark 2.4.5
val sparkConf = new SparkConf()
val secret = Utils.createSecret(sparkConf)
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python")
//Spark 3.1.1
val sparkConf = new SparkConf()
val secret = Utils.createSecret(sparkConf)
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python3")
复制代码
如果要手动指定 pythonExec,需要在执行前设置环境变量(无法通过 spark-defaults 传入)。在 cluster 模式下,可以通过 --conf “spark.executorEnv.PYSPARK_PYTHON=python3” --conf “spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3” 设置。driver 端还可以通过 export PYSPARK_PYTHON=python3 设置环境变量。
若需要上传 pyhton 包,可以通过 --archive python.tar.gz 的方式上传。
为了使应用能够获取到 py 脚本文件,还需要在启动命令中添加 --file pythonFile.py 将 python 脚本上传到 yarn 上。
运行命令参考如下:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --conf "spark.executorEnv.PYSPARK_PYTHON=python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3" /usr/local/test.jar test.py test.py
复制代码
如果需要使用其他 python 环境,而非节点上已安装的,可以通过 --archives 上传 python 压缩包,再通过环境变量指定 pythonExec,例如:
spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.RunPythonExample --files /usr/local/test.py --archives /usr/local/python.tar.gz#myPython --conf "spark.executorEnv.PYSPARK_PYTHON=myPython/bin/python3" --conf "spark.yarn.appMasterEnv.PYSPARK_PYTHON=myPython/bin/python3" /usr/local/test.jar test.py test.py
复制代码
点击关注,第一时间了解华为云新鲜技术~
评论