写点什么

【隐语 SecretFlow】 Unbalanced PSI Benchmark 性能测试报告

  • 2025-10-13
    浙江
  • 本文字数:4353 字

    阅读完需:约 14 分钟


导语

2022 年 10 月份,隐语发布了 PSI 的性能数据,当时就引起了内部和外部用户的广泛关注,具体协议包括:ecdh/kkrt16/bc22 协议,这些协议更适合双方数据量差别不大的场景,称为平衡 PSI(Balanced PSI)。

在现实的隐私求交场景中,有时双方数据量级差异很大,例如:百万 vs 十亿,2 千万 vs 20 亿。针对这种场景,隐语实现并开源了专门的非平衡 PSI(Unbalanced PSI)协议,能得到更好的性能。

具体来讲:与 ecdh-psi 对比,ecdh-psi 在大数据集上进行两次加密操作。隐语实现的非平衡 PSI 只在大数据集上进行一次加密操作,在大数据集与小数据集的体量相差非常大的时候,总体计算量和运行时间大约是 ecdh-psi 的 1/2。

非平衡 PSI 还把协议分成离线和在线(offline/online)两个阶段,在提前执行离线(offline)缓存的情形下,在线阶段只需 10 多分钟即可完成在线(online)协议,得到交集结果。

本文给出隐语非平衡 PSI 协议(Unbalanced PSI)的具体测试环境、步骤、和数据,供大家参考。

复现路径

一、测试机型

  • Python:3.8

  • pip: >= 19.3

  • OS: CentOS 7

  • CPU/Memory: 推荐最低配置是 8C16G

  • 硬盘:500G

二、安装 conda

使用 conda 管理 python 环境,如果机器没有 conda 需要先安装。

步骤如下:

#sudo apt-get install wgetwget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
#安装bash Miniconda3-latest-Linux-x86_64.sh
# 一直按回车然后输入yesplease answer 'yes' or 'no':>>> yes
# 选择安装路径, 文件名前加点号表示隐藏文件Miniconda3 will now be installed into this location:>>> ~/.miniconda3
# 添加配置信息到 ~/.bashrc文件Do you wish the installer to initialize Miniconda3 by running conda init? [yes|no][no] >>> yes
#运行配置信息文件或重启电脑source ~/.bashrc
#测试是否安装成功,有显示版本号表示安装成功conda --version
复制代码

三、安装 secretflow

conda create -n sf-benchmark python=3.8
conda activate sf-benchmark
pip install -U secretflow
复制代码

四、创建节点并启动集群

  • 创建 ray header 节点

创建 ray header 节点,选择一台机器为主机,在主机上执行如下命令,ip 替换为主机的内网 ip,命名为 alice,端口选择一个空闲端口即可

注意:192.168.0.1 ip 为 mock,请替换为实际的 ip 地址

RAY_DISABLE_REMOTE_CODE=true \ray start --head --node-ip-address="192.168.0.1" --port="9394" --resources='{"alice": 8}' --include-dashboard=False
复制代码
  • 创建从属节点

创建从属节点,在 bob 机器执行如下命令,ip 依然填 alice 机器的内网 ip,命名为 bob,端口不变

RAY_DISABLE_REMOTE_CODE=true \ray start --address="192.168.0.1:9394" --resources='{"bob": 8}'
复制代码

五、数据要求

Alice 方:2000 万

Bob 方:20 亿

交集:1000 万

六、Benchmark 脚本

脚本分为 offlineonlineoffline 用于对大数据方的 setup、online 对小数据的执行基于 ecdhoprf 协议。参考阅读:非平衡隐私集合求交(Unbalanced PSI)协议介绍。

  • offline 脚本:

import osimport sysimport timeimport loggingimport multiprocess
from absl import appimport spuimport secretflow as sf#import random
# init loglogging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

# SPU settingscluster_def = { 'nodes': [ # <<< !!! >>> replace <192.168.0.1:17268> to alice node's local ip & free port {'party': 'alice', 'id': 'local:0', 'address': f'192.168.0.1:17268'}, # <<< !!! >>> replace <192.168.0.2:17269> to bob node's local ip & free port {'party': 'bob', 'id': 'local:1', 'address': f'192.168.0.2:17269'}, ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, },}
link_desc = { 'recv_timeout_ms': 3600000,}

def main(_): # sf init # <<< !!! >>> replace <192.168.0.1:9394> to your ray head sf.shutdown() sf.init(['alice','bob'],address='192.168.0.1:9394',log_to_driver=True,omp_num_threads=multiprocess.cpu_count())
# init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
alice = sf.PYU('alice') bob = sf.PYU('bob') #carol = sf.PYU('carol')
# <<< !!! >>> replace path to real parties local file path. offline_input_path = { alice: 'dummyalice.csv', bob: '/root/benchmark/unbalanced_200000w.csv', } select_keys = { alice: ['id'], bob: ['id'], } spu = sf.SPU(cluster_def, link_desc)
# offline print("=====offline phase====") start = time.time()
offline_output_path = { alice: "/data/unbalanced_2000w_out.csv", bob: "/data/unbalanced_200000w_out.csv", }
offline_preprocess_path = "/root/benchmark/offline_out/offline_psi0107.csv" secret_key = "000102030405060708090a0b0c0d0e0ff0e0d0c0b0a090807060504030201000" secret_key_path = "/root/benchmark/secret_key.bin" with open(secret_key_path, 'wb') as f: f.write(bytes.fromhex(secret_key))
reports = spu.psi_csv( key=select_keys, input_path=offline_input_path, output_path=offline_output_path, receiver='alice', # if `broadcast_result=False`, only receiver can get output file. protocol='ECDH_OPRF_UB_PSI_2PC_OFFLINE', # psi protocol precheck_input=False, # will cost ext time if set True sort=True, # will cost ext time if set True broadcast_result=False, # will cost ext time if set True bucket_size=10000000, curve_type="CURVE_FOURQ", preprocess_path=offline_preprocess_path, ecdh_secret_key_path=secret_key_path, ) #print(f"psi reports: {reports}") logging.info(f"offline psi reports: {reports}") logging.info(f"cost time: {time.time() - start}")
sf.shutdown()

if __name__ == '__main__': app.run(main)
复制代码
  • online 脚本:

import osimport sysimport time# import randomimport loggingimport multiprocess
from absl import appimport spuimport secretflow as sf
# init loglogging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# SPU settingscluster_def = { 'nodes': [ # <<< !!! >>> replace <192.168.0.1:13666> to alice node's local ip & free port {'party': 'alice', 'id': 'local:0', 'address': f'192.168.0.1:13666'}, # <<< !!! >>> replace <192.168.0.2:12946> to bob node's local ip & free port {'party': 'bob', 'id': 'local:1', 'address': f'192.168.0.1:13667'}, ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, },}
link_desc = { 'recv_timeout_ms': 3600000,}
def main(_): # sf init # <<< !!! >>> replace <192.168.0.1:9394> to your ray head sf.shutdown() sf.init(['alice','bob'],address='192.168.0.1:9394',log_to_driver=True,omp_num_threads=multiprocess.cpu_count())
# init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
alice = sf.PYU('alice') bob = sf.PYU('bob')
# <<< !!! >>> replace path to real parties local file path. online_input_path = { alice: '/root/benchmark/unbalanced_2000w.csv', bob: 'dummy.bob.csv', } output_path = { alice: '/data/unbalanced_20000wvs2000w.csv', bob: '/data/unbalanced_20000wvs2000w.csv', } select_keys = { alice: ['id'], bob: ['id'], } spu = sf.SPU(cluster_def, link_desc)
offline_preprocess_path = "/root/benchmark/offline_out/offline_psi0107.csv" secret_key_path = "/root/benchmark/secret_key.bin"
# online print("=====online phase====") start = time.time()
reports = spu.psi_csv( key=select_keys, input_path=online_input_path, output_path=output_path, receiver='alice', # if `broadcast_result=False`, only receiver can get output file. protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', # psi protocol precheck_input=True, # will cost ext time if set True sort=True, # will cost ext time if set True broadcast_result=False, # will cost ext time if set True bucket_size=100000000, curve_type="CURVE_FOURQ", preprocess_path=offline_preprocess_path, ecdh_secret_key_path=secret_key_path, )
#print(f"psi reports: {reports}") logging.info(f"online psi reports: {reports}") logging.info(f"cost time: {time.time() - start}")
sf.shutdown()

if __name__ == '__main__': app.run(main)
复制代码
  • Benchmark 报告



备注:目前是小数据方到大数据方的交集结果,如果大数据方到小数据方的交集结果 online 运行时间会增加一倍 。




解读:

  • Unbalanced PSI 的 offline 阶段,大数据方对大数据集(20 亿)进行加密并发送(截取 12B)到小数据方,加密(ecc 点乘)可以进行并发计算,CPU 的核数增大可以提高性能。可以看到性能数据基本和 CPU 的核数成线性关系,网络带宽消耗不大。

  • Unbalanced PSI 的 online 阶段,可以划分为两部分子阶段,对小数据集数据执行 ecdh-oprf 得到小数据集的加密结果;小数据集加密结果和 offline 阶段的到大数据集加密数据进行比较的到交集。总体计算量和传输量不大,运行时间在 10 几分钟左右。

Balanced PSI Benchmark 报告

为了方便大家对比,同等硬件资源和数据规模下,对平衡 PSI 做了性能测试,报告如下:




解读:

Balanced PSI 跑 20 亿*2000 万规模数据时,在 8C16G 资源下 CPU 成为瓶颈,机器资源提升到 64C128G 后 LAN 环境 79 分钟完成任务执行。

用户头像

关注微信公众号:隐语的小剧场 2022-08-01 加入

隐语SecretFlow是蚂蚁自主研发的隐私计算开源框架,内置MPC、TEE、同态等多种密态计算虚拟设备供灵活选择。同时我们专注于隐私计算领域任何前沿技术、最新动态、行业资讯,隐语期待您的加入!

评论

发布
暂无评论
【隐语SecretFlow】 Unbalanced PSI Benchmark性能测试报告_隐语SecretFlow_InfoQ写作社区