Skip to content

Commit

Permalink
zk bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
jat001 committed Sep 6, 2021
1 parent 76f21da commit b32a998
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
10 changes: 8 additions & 2 deletions python/fate_flow/fate_flow_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import time
import traceback
import atexit

import grpc
from flask import Flask
Expand Down Expand Up @@ -108,8 +109,13 @@ def internal_server_error(e):
RuntimeConfig.init_env()
RuntimeConfig.set_process_role(ProcessRole.DRIVER)
PrivilegeAuth.init()
ServiceUtils.register()
ServiceUtils.register_models(models_group_by_party_model_id_and_model_version())

RuntimeConfig.zk_client = ServiceUtils.get_zk()
RuntimeConfig.zk_client.start()
atexit.register(RuntimeConfig.zk_client.stop)
ServiceUtils.register(RuntimeConfig.zk_client)
ServiceUtils.register_models(RuntimeConfig.zk_client, models_group_by_party_model_id_and_model_version())

ResourceManager.initialize()
Detector(interval=5 * 1000, logger=detect_logger).start()
DAGScheduler(interval=2 * 1000, logger=schedule_logger()).start()
Expand Down
4 changes: 3 additions & 1 deletion python/fate_flow/utils/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from fate_arch.common.file_utils import get_project_base_directory
from fate_flow.pipelined_model.pipelined_model import PipelinedModel

from fate_flow.entity.runtime_config import RuntimeConfig
from fate_flow.db.db_models import DB, MachineLearningModelInfo as MLModel
from fate_flow.utils.service_utils import ServiceUtils

Expand Down Expand Up @@ -178,7 +179,8 @@ def save_model_info(model_info):
rows = model.save(force_insert=True)
if rows != 1:
raise Exception("Create {} failed".format(MLModel))
ServiceUtils.register(gen_party_model_id(role=model.f_role,
ServiceUtils.register(RuntimeConfig.zk_client,
gen_party_model_id(role=model.f_role,
party_id=model.f_party_id,
model_id=model.f_model_id),
model.f_model_version)
Expand Down
14 changes: 4 additions & 10 deletions python/fate_flow/utils/service_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import atexit
import socket
from urllib import parse

Expand Down Expand Up @@ -72,13 +71,10 @@ def get_from_registry(cls, service_name):
raise Exception('loading servings node failed from zookeeper: {}'.format(e))

@classmethod
def register(cls, zk=None, party_model_id=None, model_version=None):
def register(cls, zk, party_model_id=None, model_version=None):
if not get_base_config('use_registry', False):
return
if not zk:
zk = ServiceUtils.get_zk()
zk.start()
atexit.register(zk.stop)

model_transfer_url = 'http://{}:{}{}'.format(IP, HTTP_PORT, FATE_FLOW_MODEL_TRANSFER_ENDPOINT)
if party_model_id is not None and model_version is not None:
model_transfer_url += '/{}/{}'.format(party_model_id.replace('#', '~'), model_version)
Expand All @@ -93,12 +89,10 @@ def register(cls, zk=None, party_model_id=None, model_version=None):
stat_logger.exception(e)

@classmethod
def register_models(cls, models):
def register_models(cls, zk, models):
if not get_base_config('use_registry', False):
return
zk = ServiceUtils.get_zk()
zk.start()
atexit.register(zk.stop)

for model in models:
cls.register(zk, model.f_party_model_id, model.f_model_version)

Expand Down

0 comments on commit b32a998

Please sign in to comment.