import logging
import socket
import sys
import time
import spu
from sklearn.metrics import mean_squared_error, roc_auc_score
import secretflow as sf
from secretflow.data import FedNdarray, PartitionWay
from secretflow.device.driver import reveal, wait
from secretflow.ml.boost.sgb_v import Sgb
from secretflow.utils.simulation.datasets import create_df
from secretflow.data.vertical import read_csv as v_read_csv
# init log
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.info("test")
_parties = {
# you may change the addresses
# 将alice、bob、carol的ip替换为实际ip
'alice': {'address': '192.168.0.1:23041'},
'bob': {'address': '192.168.0.2:23042'},
'carol': {'address': '192.168.0.3:23043'},
}
def setup_sf(party, alice_ip, bob_ip, carol_ip):
cluster_conf = {
'parties': _parties,
'self_party': party,
}
# init cluster
_system_config = {'lineage_pinning_enabled': False}
sf.init(
address='local',
num_cpus=8,
log_to_driver=True,
cluster_config=cluster_conf,
exit_on_failure_cross_silo_sending=True,
_system_config=_system_config,
_memory=5 * 1024 * 1024 * 1024,
cross_silo_messages_max_size_in_bytes = 2 * 1024 * 1024 * 1024 -1,
object_store_memory=5 * 1024 * 1024 * 1024,
)
# SPU settings
cluster_def = {
'nodes': [
{'party': 'alice', 'id': 'local:0', 'address': alice_ip},
{'party': 'bob', 'id': 'local:1', 'address': bob_ip},
{'party': 'carol', 'id': 'local:1', 'address': carol_ip},
],
'runtime_config': {
# SEMI2K support 2/3 PC, ABY3 only support 3PC, CHEETAH only support 2PC.
# pls pay attention to size of nodes above. nodes size need match to PC setting.
'protocol': spu.spu_pb2.ABY3,
'field': spu.spu_pb2.FM64,
},
}
# HEU settings
heu_config = {
'sk_keeper': {'party': 'alice'},
'evaluators': [{'party': 'bob'},{'party': 'carol'}],
'mode': 'PHEU', # 这里修改同态加密相关配置
'he_parameters': {
'schema': 'paillier',
'key_pair': {
'generate': {
'bit_size': 2048,
},
},
},
'encoding': {
'cleartext_type': 'DT_I32',
'encoder': "IntegerEncoder",
'encoder_args': {"scale": 1},
},
}
return cluster_def, heu_config
class SGB_benchmark:
def __init__(self, cluster_def, heu_config):
self.alice = sf.PYU('alice')
self.bob = sf.PYU('bob')
self.carol = sf.PYU('carol')
self.heu = sf.HEU(heu_config, cluster_def['runtime_config']['field'])
def run_sgb(self, test_name, v_data, label_data, y, logistic, subsample, colsample):
sgb = Sgb(self.heu)
start = time.time()
params = {
'num_boost_round': 5,
'max_depth': 5,
'sketch_eps': 0.08,
'objective': 'logistic' if logistic else 'linear',
'reg_lambda': 0.3,
'subsample': subsample,
'colsample_by_tree': colsample,
}
model = sgb.train(params, v_data, label_data)
# reveal(model.weights[-1])
print(f"{test_name} train time: {time.time() - start}")
start = time.time()
yhat = model.predict(v_data)
yhat = reveal(yhat)
print(f"{test_name} predict time: {time.time() - start}")
if logistic:
print(f"{test_name} auc: {roc_auc_score(y, yhat)}")
else:
print(f"{test_name} mse: {mean_squared_error(y, yhat)}")
fed_yhat = model.predict(v_data, self.alice)
assert len(fed_yhat.partitions) == 1 and self.alice in fed_yhat.partitions
yhat = reveal(fed_yhat.partitions[self.alice])
assert yhat.shape[0] == y.shape[0], f"{yhat.shape} == {y.shape}"
if logistic:
print(f"{test_name} auc: {roc_auc_score(y, yhat)}")
else:
print(f"{test_name} mse: {mean_squared_error(y, yhat)}")
def test_on_linear(self, sample_num, total_num):
"""
sample_num: int. this number * 10000 = sample number in dataset.
"""
io_start = time.perf_counter()
common_path = "/root/sf-benchmark/data/{}w_{}d_3pc/independent_linear.".format(
sample_num, total_num
)
vdf = v_read_csv(
{self.alice: common_path + "1.csv", self.bob: common_path + "2.csv", self.carol: common_path + "3.csv"},
keys='id',
drop_keys='id',
)
# split y out of dataset,
# <<< !!! >>> change 'y' if label column name is not y in dataset.
label_data = vdf["y"]
# v_data remains all features.
v_data = vdf.drop(columns="y")
# <<< !!! >>> change bob if y not belong to bob.
y = reveal(label_data.partitions[self.alice].data)
wait([p.data for p in v_data.partitions.values()])
io_end = time.perf_counter()
print("io takes time", io_end - io_start)
self.run_sgb("independent_linear", v_data, label_data, y, True, 1, 1)
def run_test(party):
cluster_def, heu_config = setup_sf(party, _parties['alice'], _parties['bob'], _parties['carol'])
test_suite = SGB_benchmark(cluster_def, heu_config)
test_suite.test_on_linear(100, 100)
sf.shutdown()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(prog='sgb benchmark remote')
parser.add_argument('party')
args = parser.parse_args()
run_test(args.party)
评论