From b32a998589d1e98b5219a407cdeee77e9d853007 Mon Sep 17 00:00:00 2001 From: Jat Date: Mon, 6 Sep 2021 17:37:30 +0800 Subject: [PATCH] zk bugfix --- python/fate_flow/fate_flow_server.py | 10 ++++++++-- python/fate_flow/utils/model_utils.py | 4 +++- python/fate_flow/utils/service_utils.py | 14 ++++---------- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/python/fate_flow/fate_flow_server.py b/python/fate_flow/fate_flow_server.py index f75959367e..b5d70036f3 100644 --- a/python/fate_flow/fate_flow_server.py +++ b/python/fate_flow/fate_flow_server.py @@ -18,6 +18,7 @@ import sys import time import traceback +import atexit import grpc from flask import Flask @@ -108,8 +109,13 @@ def internal_server_error(e): RuntimeConfig.init_env() RuntimeConfig.set_process_role(ProcessRole.DRIVER) PrivilegeAuth.init() - ServiceUtils.register() - ServiceUtils.register_models(models_group_by_party_model_id_and_model_version()) + + RuntimeConfig.zk_client = ServiceUtils.get_zk() + RuntimeConfig.zk_client.start() + atexit.register(RuntimeConfig.zk_client.stop) + ServiceUtils.register(RuntimeConfig.zk_client) + ServiceUtils.register_models(RuntimeConfig.zk_client, models_group_by_party_model_id_and_model_version()) + ResourceManager.initialize() Detector(interval=5 * 1000, logger=detect_logger).start() DAGScheduler(interval=2 * 1000, logger=schedule_logger()).start() diff --git a/python/fate_flow/utils/model_utils.py b/python/fate_flow/utils/model_utils.py index aef2b0277d..3cba82d398 100644 --- a/python/fate_flow/utils/model_utils.py +++ b/python/fate_flow/utils/model_utils.py @@ -25,6 +25,7 @@ from fate_arch.common.file_utils import get_project_base_directory from fate_flow.pipelined_model.pipelined_model import PipelinedModel +from fate_flow.entity.runtime_config import RuntimeConfig from fate_flow.db.db_models import DB, MachineLearningModelInfo as MLModel from fate_flow.utils.service_utils import ServiceUtils @@ -178,7 +179,8 @@ def save_model_info(model_info): rows = model.save(force_insert=True) if rows != 1: raise Exception("Create {} failed".format(MLModel)) - ServiceUtils.register(gen_party_model_id(role=model.f_role, + ServiceUtils.register(RuntimeConfig.zk_client, + gen_party_model_id(role=model.f_role, party_id=model.f_party_id, model_id=model.f_model_id), model.f_model_version) diff --git a/python/fate_flow/utils/service_utils.py b/python/fate_flow/utils/service_utils.py index 419c396e0f..e5a040447f 100644 --- a/python/fate_flow/utils/service_utils.py +++ b/python/fate_flow/utils/service_utils.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import atexit import socket from urllib import parse @@ -72,13 +71,10 @@ def get_from_registry(cls, service_name): raise Exception('loading servings node failed from zookeeper: {}'.format(e)) @classmethod - def register(cls, zk=None, party_model_id=None, model_version=None): + def register(cls, zk, party_model_id=None, model_version=None): if not get_base_config('use_registry', False): return - if not zk: - zk = ServiceUtils.get_zk() - zk.start() - atexit.register(zk.stop) + model_transfer_url = 'http://{}:{}{}'.format(IP, HTTP_PORT, FATE_FLOW_MODEL_TRANSFER_ENDPOINT) if party_model_id is not None and model_version is not None: model_transfer_url += '/{}/{}'.format(party_model_id.replace('#', '~'), model_version) @@ -93,12 +89,10 @@ def register(cls, zk=None, party_model_id=None, model_version=None): stat_logger.exception(e) @classmethod - def register_models(cls, models): + def register_models(cls, zk, models): if not get_base_config('use_registry', False): return - zk = ServiceUtils.get_zk() - zk.start() - atexit.register(zk.stop) + for model in models: cls.register(zk, model.f_party_model_id, model.f_model_version)