Skip to content

Commit

Permalink
improve service_conf.yaml format
Browse files Browse the repository at this point in the history
  • Loading branch information
jarviszeng-zjc committed Oct 20, 2020
1 parent eab088c commit 4a2a940
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 79 deletions.
6 changes: 6 additions & 0 deletions conf/rabbitmq_route_table.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
9999:
host: 192.168.0.4
port: 5672
10000:
host: 192.168.0.3
port: 5672
55 changes: 26 additions & 29 deletions conf/service_conf.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
work_mode: 1
independent_scheduling_proxy: false
use_registry: false
fateflow:
# you must set real ip address or 127.0.0.1, 0.0.0.0 is not supported
host: 127.0.0.1
http_port: 9380
grpc_port: 9360
# support rollsite or nginx as a coordinate proxy, rollsite recommended in the fate on eggroll, nginx is recommended in the fate on spark
# format(proxy: rollsite) means rollsite use the rollsite configuration of fate_one_eggroll and nginx use the nginx configuration of fate_one_spark
# you can customize the config by format(proxy:\n name: rollsite \n host: xx \n port: xx)
proxy: rollsite
fateboard:
host: 127.0.0.1
port: 8080
Expand All @@ -16,34 +20,32 @@ database:
port: 3306
max_connections: 100
stale_timeout: 30
EGGROLL:
address:
fate_on_eggroll:
clustermanager:
cores_per_node: 20
nodes: 1
rollsite:
host: 127.0.0.1
port: 9370
cores_per_node: 20
nodes: 1
SPARK:
address:
fate_on_spark:
spark:
# default use SPARK_HOME environment variable
home:
cores_per_node: 20
nodes: 2
HDFS:
address:
cores_per_node: 20
nodes: 2
hdfs:
name_node: hdfs://fate-cluster
# default /
path_prefix:
RABBITMQ:
address:
self:
9999: 192.168.0.4
mng_port: 12345
port: 5672
user: fate
password: fate
10000:
host: 192.168.0.3
port: 5672
PROXY:
address:
rabbitmq:
host: 192.168.0.4
mng_port: 12345
port: 5672
user: fate
password: fate
# default conf/rabbitmq_route_table.yaml
route_table:
nginx:
host: 127.0.0.1
port: 9373
model_store_address:
Expand All @@ -55,11 +57,6 @@ model_store_address:
passwd: fate_dev
max_connections: 10
stale_timeout: 10
data_storage_address:
user: fate_dev
passwd: fate_dev
host: 127.0.0.1
port: 3306
servings:
hosts:
- 127.0.0.1:8000
Expand Down
2 changes: 1 addition & 1 deletion python/fate_arch/common/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from fate_arch.common._types import WorkMode, Backend, Party, FederatedMode, FederatedCommunicationType, EngineType
from fate_arch.common._types import WorkMode, Backend, Party, FederatedMode, FederatedCommunicationType, EngineType, CoordinateProxyService
8 changes: 7 additions & 1 deletion python/fate_arch/common/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def is_cluster(self):
class Backend(IntEnum):
EGGROLL = 0
SPARK = 1
STANDALONE = 1

def is_spark(self):
return self.value == self.SPARK
Expand All @@ -25,8 +26,13 @@ def is_eggroll(self):

class EngineType(object):
COMPUTING = "computing"
FEDERATION = "federation"
STORAGE = "storage"
FEDERATION = "federation"


class CoordinateProxyService(object):
rollsite = "rollsite"
nginx = 'nginx'


class FederatedMode(object):
Expand Down
2 changes: 1 addition & 1 deletion python/fate_flow/controller/job_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def get_job_engines_address(cls, job_parameters: RunParameters):
]
for engine_type, engine_name in engine_list:
engine_info = ResourceManager.get_engine_registration_info(engine_type=engine_type, engine_name=engine_name)
job_parameters.engines_address[engine_type] = engine_info.f_engine_address
job_parameters.engines_address[engine_type] = engine_info.f_engine_config
engines_info[engine_type] = engine_info
return engines_info

Expand Down
5 changes: 3 additions & 2 deletions python/fate_flow/db/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ class Meta:


class EngineRegistry(DataBaseModel):
f_engine_name = CharField(max_length=50, index=True)
f_engine_type = CharField(max_length=10, index=True)
f_engine_address = JSONField()
f_engine_name = CharField(max_length=50, index=True)
f_engine_entrance = CharField(max_length=50, index=True)
f_engine_config = JSONField()
f_cores = IntegerField(index=True)
f_memory = IntegerField(index=True) # MB
f_remaining_cores = IntegerField(index=True)
Expand Down
57 changes: 29 additions & 28 deletions python/fate_flow/manager/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,43 @@
from fate_arch.computing import ComputingEngine
from fate_flow.db.db_models import DB, EngineRegistry, Job
from fate_flow.entity.types import ResourceOperation, RunParameters
from fate_flow.settings import stat_logger, STANDALONE_BACKEND_VIRTUAL_CORES_PER_NODE, SUPPORT_ENGINES, \
from fate_flow.settings import stat_logger, STANDALONE_BACKEND_VIRTUAL_CORES_PER_NODE, SUPPORT_BACKENDS_ENTRANCE, \
MAX_CORES_PERCENT_PER_JOB, DEFAULT_TASK_CORES_PER_NODE
from fate_flow.utils import job_utils


class ResourceManager(object):
@classmethod
def initialize(cls):
for engine_type, engines_name in SUPPORT_ENGINES.items():
for engine_name in engines_name:
engine_info = get_base_config(engine_name, {})
if engine_info:
engine_info["engine"] = engine_name
cls.register_engine(engine_type=engine_type, engine_info=engine_info)
for backend_name, backend_engines in SUPPORT_BACKENDS_ENTRANCE.items():
for engine_type, engine_keys in backend_engines.items():
engine_config = get_base_config(backend_name, {}).get(engine_keys[1], {})
if engine_config:
cls.register_engine(engine_type=engine_type, engine_name=engine_keys[0], engine_entrance=engine_keys[1], engine_config=engine_config)

# initialize standalone engine
for engine_type in SUPPORT_ENGINES.keys():
engine_name = "STANDALONE"
engine_info = {
"engine": engine_name,
"nodes": 1,
"cores_per_node": STANDALONE_BACKEND_VIRTUAL_CORES_PER_NODE,
}
cls.register_engine(engine_type=engine_type, engine_info=engine_info)
for backend_engines in SUPPORT_BACKENDS_ENTRANCE.values():
for engine_type in backend_engines.keys():
engine_name = "STANDALONE"
engine_entrance = "fateflow"
engine_config = {
"nodes": 1,
"cores_per_node": STANDALONE_BACKEND_VIRTUAL_CORES_PER_NODE,
}
cls.register_engine(engine_type=engine_type, engine_name=engine_name, engine_entrance=engine_entrance, engine_config=engine_config)

@classmethod
@DB.connection_context()
def register_engine(cls, engine_type, engine_info):
nodes = engine_info.get("nodes", 1)
cores = engine_info.get("cores_per_node", 0) * nodes
memory = engine_info.get("memory_per_node", 0) * nodes
engine_name = engine_info.get("engine", "UNKNOWN")
engine_address = engine_info.get("address", {})
filters = [EngineRegistry.f_engine_name == engine_name, EngineRegistry.f_engine_type == engine_type]
def register_engine(cls, engine_type, engine_name, engine_entrance, engine_config):
nodes = engine_config.get("nodes", 1)
cores = engine_config.get("cores_per_node", 0) * nodes
memory = engine_config.get("memory_per_node", 0) * nodes
filters = [EngineRegistry.f_engine_type == engine_type, EngineRegistry.f_engine_name == engine_name]
resources = EngineRegistry.select().where(*filters)
if resources:
resource = resources[0]
update_fields = {}
update_fields[EngineRegistry.f_engine_address] = engine_address
update_fields[EngineRegistry.f_engine_config] = engine_config
update_fields[EngineRegistry.f_cores] = cores
update_fields[EngineRegistry.f_memory] = memory
update_fields[EngineRegistry.f_remaining_cores] = EngineRegistry.f_remaining_cores + (
Expand All @@ -72,15 +71,17 @@ def register_engine(cls, engine_type, engine_info):
operate = EngineRegistry.update(update_fields).where(*filters)
update_status = operate.execute() > 0
if update_status:
stat_logger.info(f"update {engine_type} engine {engine_name} registration information")
stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information")
else:
stat_logger.info(f"update {engine_type} engine {engine_name} registration information takes no effect")
stat_logger.info(f"update {engine_type} engine {engine_name} {engine_entrance} registration information takes no effect")
else:
resource = EngineRegistry()
resource.f_create_time = base_utils.current_timestamp()
resource.f_engine_name = engine_name
resource.f_engine_type = engine_type
resource.f_engine_address = engine_address
resource.f_engine_name = engine_name
resource.f_engine_entrance = engine_entrance
resource.f_engine_config = engine_config

resource.f_cores = cores
resource.f_memory = memory
resource.f_remaining_cores = cores
Expand All @@ -90,7 +91,7 @@ def register_engine(cls, engine_type, engine_info):
resource.save(force_insert=True)
except Exception as e:
stat_logger.warning(e)
stat_logger.info(f"create {engine_type} engine {engine_name} registration information")
stat_logger.info(f"create {engine_type} engine {engine_name} {engine_entrance} registration information")

@classmethod
def check_resource_apply(cls, job_parameters: RunParameters, engines_info):
Expand Down
6 changes: 0 additions & 6 deletions python/fate_flow/operation/job_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,6 @@ def query_job(cls, reverse=None, order_by=None, **kwargs):
# not allow query all job
return []

@classmethod
@DB.connection_context()
def query_start_timeout_job(cls, timeout):
jobs = Job.select().where(Job.f_status == JobStatus.WAITING, Job.f_cores > 0, Job.f_update_time < current_timestamp() - timeout)
return [job for job in jobs]

@classmethod
@DB.connection_context()
def get_tasks_asc(cls, job_id, role, party_id):
Expand Down
2 changes: 0 additions & 2 deletions python/fate_flow/operation/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ def run_task(cls):
sess.init_federation(federation_session_id=federation_session_id,
runtime_conf=component_parameters_on_party,
service_conf=job_parameters.engines_address.get(EngineType.FEDERATION, {}))
print(job_parameters.federation_engine)
print(job_parameters.engines_address.get(EngineType.FEDERATION, {}))
sess.as_default()

schedule_logger().info('Run {} {} {} {} {} task'.format(job_id, component_name, task_id, role, party_id))
Expand Down
15 changes: 11 additions & 4 deletions python/fate_flow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@
MODEL_STORE_ADDRESS = get_base_config("model_store_address", {})

# storage engine is used for component output data
SUPPORT_ENGINES = {
EngineType.COMPUTING: [ComputingEngine.EGGROLL, ComputingEngine.SPARK],
EngineType.FEDERATION: [FederationEngine.EGGROLL, FederationEngine.RABBITMQ],
EngineType.STORAGE: [StorageEngine.EGGROLL, StorageEngine.HDFS]
SUPPORT_BACKENDS_ENTRANCE = {
"fate_on_eggroll": {
EngineType.COMPUTING: (ComputingEngine.EGGROLL, "clustermanager"),
EngineType.STORAGE: (StorageEngine.EGGROLL, "clustermanager"),
EngineType.FEDERATION: (FederationEngine.EGGROLL, "rollsite"),
},
"fate_on_spark": {
EngineType.COMPUTING: (ComputingEngine.SPARK, "spark"),
EngineType.STORAGE: (StorageEngine.HDFS, "hdfs"),
EngineType.FEDERATION: (FederationEngine.RABBITMQ, "rabbitmq"),
},
}

# upload data
Expand Down
2 changes: 1 addition & 1 deletion python/fate_flow/utils/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def remote_api(job_id, method, endpoint, src_party_id, dest_party_id, src_role,
exception = None
for t in range(try_times):
try:
engine, channel, stub = get_command_federation_channel()
channel, stub = get_command_federation_channel()
# _return = stub.unaryCall(_packet)
_return, _call = stub.unaryCall.with_call(_packet, metadata=_routing_metadata, timeout=(overall_timeout/1000))
audit_logger(job_id).info("grpc api response: {}".format(_return))
Expand Down
18 changes: 14 additions & 4 deletions python/fate_flow/utils/grpc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@
import json

from fate_arch.common.log import audit_logger
from fate_arch.common import CoordinateProxyService
from fate_flow.utils.proto_compatibility import basic_meta_pb2
from fate_flow.utils.proto_compatibility import proxy_pb2, proxy_pb2_grpc
import grpc

from fate_flow.settings import FATEFLOW_SERVICE_NAME, IP, GRPC_PORT, HEADERS, DEFAULT_GRPC_OVERALL_TIMEOUT, stat_logger
from fate_flow.entity.runtime_config import RuntimeConfig
from fate_flow.utils.node_check_utils import nodes_check
from fate_arch.common import conf_utils
from fate_arch.common.conf_utils import get_base_config


def get_command_federation_channel():
engine = "PROXY" if get_base_config("independent_scheduling_proxy", False) else "EGGROLL"
address = conf_utils.get_base_config(engine).get("address")
proxy_config = get_base_config("fateflow", {}).get("proxy", None)
if isinstance(proxy_config, str):
if proxy_config == CoordinateProxyService.rollsite:
address = get_base_config("fate_on_eggroll", {}).get(proxy_config)
elif proxy_config == CoordinateProxyService.nginx:
address = get_base_config("fate_on_spark", {}).get(proxy_config)
else:
raise RuntimeError(f"can not support coordinate proxy {proxy_config}")
elif isinstance(proxy_config, dict):
address = proxy_config
else:
raise RuntimeError(f"can not support coordinate proxy config {proxy_config}")
channel = grpc.insecure_channel('{}:{}'.format(address.get("host"), address.get("port")))
stub = proxy_pb2_grpc.DataTransferServiceStub(channel)
return engine, channel, stub
return channel, stub


def get_routing_metadata(src_party_id, dest_party_id):
Expand Down

0 comments on commit 4a2a940

Please sign in to comment.