forked from Yelp/paasta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flinkeks_tools.py
90 lines (77 loc) · 3 KB
/
flinkeks_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import Optional
import service_configuration_lib
from paasta_tools.flink_tools import FlinkDeploymentConfig
from paasta_tools.flink_tools import FlinkDeploymentConfigDict
from paasta_tools.utils import BranchDictV2
from paasta_tools.utils import deep_merge_dictionaries
from paasta_tools.utils import DEFAULT_SOA_DIR
from paasta_tools.utils import load_service_instance_config
from paasta_tools.utils import load_v2_deployments_json
class FlinkEksDeploymentConfig(FlinkDeploymentConfig):
config_dict: FlinkDeploymentConfigDict
config_filename_prefix = "flinkeks"
def __init__(
self,
service: str,
cluster: str,
instance: str,
config_dict: FlinkDeploymentConfigDict,
branch_dict: Optional[BranchDictV2],
soa_dir: str = DEFAULT_SOA_DIR,
) -> None:
super().__init__(
cluster=cluster,
instance=instance,
service=service,
soa_dir=soa_dir,
config_dict=config_dict,
branch_dict=branch_dict,
)
def load_flinkeks_instance_config(
service: str,
instance: str,
cluster: str,
load_deployments: bool = True,
soa_dir: str = DEFAULT_SOA_DIR,
) -> FlinkEksDeploymentConfig:
"""Read a service instance's configuration for Flink on EKS.
If a branch isn't specified for a config, the 'branch' key defaults to
paasta-${cluster}.${instance}.
:param service: The service name
:param instance: The instance of the service to retrieve
:param cluster: The cluster to read the configuration for
:param load_deployments: A boolean indicating if the corresponding deployments.json for this service
should also be loaded
:param soa_dir: The SOA configuration directory to read from
:returns: A dictionary of whatever was in the config for the service instance"""
general_config = service_configuration_lib.read_service_configuration(
service, soa_dir=soa_dir
)
instance_config = load_service_instance_config(
service, instance, "flinkeks", cluster, soa_dir=soa_dir
)
general_config = deep_merge_dictionaries(
overrides=instance_config, defaults=general_config
)
branch_dict: Optional[BranchDictV2] = None
if load_deployments:
deployments_json = load_v2_deployments_json(service, soa_dir=soa_dir)
temp_instance_config = FlinkEksDeploymentConfig(
service=service,
cluster=cluster,
instance=instance,
config_dict=general_config,
branch_dict=None,
soa_dir=soa_dir,
)
branch = temp_instance_config.get_branch()
deploy_group = temp_instance_config.get_deploy_group()
branch_dict = deployments_json.get_branch_dict(service, branch, deploy_group)
return FlinkEksDeploymentConfig(
service=service,
cluster=cluster,
instance=instance,
config_dict=general_config,
branch_dict=branch_dict,
soa_dir=soa_dir,
)