import loggingimport socketimport sysimport time
import spufrom sklearn.metrics import mean_squared_error, roc_auc_score
import secretflow as sffrom secretflow.data import FedNdarray, PartitionWayfrom secretflow.device.driver import reveal, waitfrom secretflow.ml.boost.sgb_v import Sgbfrom secretflow.utils.simulation.datasets import create_dffrom secretflow.data.vertical import read_csv as v_read_csv
# init loglogging.basicConfig(stream=sys.stdout, level=logging.INFO)logging.info("test")
_parties = { # you may change the addresses # 将alice、bob、carol的ip替换为实际ip 'alice': {'address': '192.168.0.1:23041'}, 'bob': {'address': '192.168.0.2:23042'}, 'carol': {'address': '192.168.0.3:23043'},
}
def setup_sf(party, alice_ip, bob_ip, carol_ip):
cluster_conf = { 'parties': _parties, 'self_party': party, }
# init cluster _system_config = {'lineage_pinning_enabled': False} sf.init( address='local', num_cpus=8, log_to_driver=True, cluster_config=cluster_conf, exit_on_failure_cross_silo_sending=True, _system_config=_system_config, _memory=5 * 1024 * 1024 * 1024, cross_silo_messages_max_size_in_bytes = 2 * 1024 * 1024 * 1024 -1, object_store_memory=5 * 1024 * 1024 * 1024, ) # SPU settings cluster_def = { 'nodes': [ {'party': 'alice', 'id': 'local:0', 'address': alice_ip}, {'party': 'bob', 'id': 'local:1', 'address': bob_ip}, {'party': 'carol', 'id': 'local:1', 'address': carol_ip}, ], 'runtime_config': { # SEMI2K support 2/3 PC, ABY3 only support 3PC, CHEETAH only support 2PC. # pls pay attention to size of nodes above. nodes size need match to PC setting. 'protocol': spu.spu_pb2.ABY3, 'field': spu.spu_pb2.FM64, }, }
# HEU settings heu_config = { 'sk_keeper': {'party': 'alice'}, 'evaluators': [{'party': 'bob'},{'party': 'carol'}], 'mode': 'PHEU', # 这里修改同态加密相关配置 'he_parameters': { 'schema': 'paillier', 'key_pair': { 'generate': { 'bit_size': 2048, }, }, }, 'encoding': { 'cleartext_type': 'DT_I32', 'encoder': "IntegerEncoder", 'encoder_args': {"scale": 1}, }, } return cluster_def, heu_config
class SGB_benchmark: def __init__(self, cluster_def, heu_config): self.alice = sf.PYU('alice') self.bob = sf.PYU('bob') self.carol = sf.PYU('carol') self.heu = sf.HEU(heu_config, cluster_def['runtime_config']['field'])
def run_sgb(self, test_name, v_data, label_data, y, logistic, subsample, colsample): sgb = Sgb(self.heu) start = time.time() params = { 'num_boost_round': 5, 'max_depth': 5, 'sketch_eps': 0.08, 'objective': 'logistic' if logistic else 'linear', 'reg_lambda': 0.3, 'subsample': subsample, 'colsample_by_tree': colsample, } model = sgb.train(params, v_data, label_data) # reveal(model.weights[-1]) print(f"{test_name} train time: {time.time() - start}") start = time.time() yhat = model.predict(v_data) yhat = reveal(yhat) print(f"{test_name} predict time: {time.time() - start}") if logistic: print(f"{test_name} auc: {roc_auc_score(y, yhat)}")else: print(f"{test_name} mse: {mean_squared_error(y, yhat)}")
fed_yhat = model.predict(v_data, self.alice) assert len(fed_yhat.partitions) == 1 and self.alice in fed_yhat.partitions yhat = reveal(fed_yhat.partitions[self.alice]) assert yhat.shape[0] == y.shape[0], f"{yhat.shape} == {y.shape}" if logistic: print(f"{test_name} auc: {roc_auc_score(y, yhat)}")else: print(f"{test_name} mse: {mean_squared_error(y, yhat)}")
def test_on_linear(self, sample_num, total_num): """ sample_num: int. this number * 10000 = sample number in dataset. """ io_start = time.perf_counter() common_path = "/root/sf-benchmark/data/{}w_{}d_3pc/independent_linear.".format( sample_num, total_num ) vdf = v_read_csv( {self.alice: common_path + "1.csv", self.bob: common_path + "2.csv", self.carol: common_path + "3.csv"}, keys='id', drop_keys='id', ) # split y out of dataset, # <<< !!! >>> change 'y' if label column name is not y in dataset. label_data = vdf["y"] # v_data remains all features. v_data = vdf.drop(columns="y") # <<< !!! >>> change bob if y not belong to bob. y = reveal(label_data.partitions[self.alice].data) wait([p.data for p in v_data.partitions.values()]) io_end = time.perf_counter() print("io takes time", io_end - io_start) self.run_sgb("independent_linear", v_data, label_data, y, True, 1, 1)
def run_test(party): cluster_def, heu_config = setup_sf(party, _parties['alice'], _parties['bob'], _parties['carol']) test_suite = SGB_benchmark(cluster_def, heu_config) test_suite.test_on_linear(100, 100)
sf.shutdown()
if __name__ == '__main__': import argparse
parser = argparse.ArgumentParser(prog='sgb benchmark remote') parser.add_argument('party') args = parser.parse_args() run_test(args.party)
评论