forked from Checkmk/checkmk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent_aws
144 lines (115 loc) · 4.88 KB
/
agent_aws
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
# Copyright (C) 2019 tribe29 GmbH - License: GNU General Public License v2
# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and
# conditions defined in the file COPYING, which is part of this source code package.
from typing import Any, Mapping, Optional, Sequence
def _get_tag_options(tag_values, prefix):
options = []
for key, values in tag_values:
options.append("--%s-tag-key" % prefix)
options.append(key)
options.append("--%s-tag-values" % prefix)
options += values
return options
def _get_services_config(services):
# '--services': {
# 's3': {'selection': ('tags', [('KEY', ['VAL1', 'VAL2'])])},
# 'ec2': {'selection': 'all'},
# 'ebs': {'selection': ('names', ['ebs1', 'ebs2'])},
# }
service_args = []
for service_name, service_config in services.items():
if service_config is None:
continue
if service_config.get("limits"):
service_args += ["--%s-limits" % service_name]
selection = service_config.get("selection")
if not isinstance(selection, tuple):
# Here: value of selection is 'all' which means there's no
# restriction (names or tags) to the instances of a specific
# AWS service. The commandline option already includes this
# service '--services SERVICE1 SERVICE2 ...' (see below).
continue
selection_type, selection_values = selection
if not selection_values:
continue
if selection_type == "names":
service_args.append("--%s-names" % service_name)
service_args += selection_values
elif selection_type == "tags":
service_args += _get_tag_options(selection_values, service_name)
return service_args
def _proxy_args(details: Mapping[str, Any]) -> Sequence[Any]:
proxy_args = ["--proxy-host", details["proxy_host"]]
if proxy_port := details.get("proxy_port"):
proxy_args += ["--proxy-port", str(proxy_port)]
if (proxy_user := details.get("proxy_user")) and (proxy_pwd := details.get("proxy_password")):
proxy_args += [
"--proxy-user",
proxy_user,
"--proxy-password",
passwordstore_get_cmdline("%s", proxy_pwd),
]
return proxy_args
def agent_aws_arguments( # pylint: disable=too-many-branches
params: Mapping[str, Any], hostname: str, ipaddress: Optional[str]
) -> Sequence[Any]:
args = [
"--access-key-id",
params["access_key_id"],
"--secret-access-key",
passwordstore_get_cmdline("%s", params["secret_access_key"]),
*(_proxy_args(params["proxy_details"]) if "proxy_details" in params else []),
]
if params.get("assume_role"):
args += ["--assume-role"]
role_arn_id = params["assume_role"].get("role_arn_id")
if role_arn_id:
if role_arn_id[0]:
args += ["--role-arn", role_arn_id[0]]
if role_arn_id[1]:
args += ["--external-id", role_arn_id[1]]
regions = params.get("regions")
if regions:
args.append("--regions")
args += regions
global_services = params.get("global_services", {})
if global_services:
args.append("--global-services")
# We need to sort the inner services-as-a-dict-params
# in order to create reliable tests
args += sorted(global_services)
args += _get_services_config(global_services)
services = params.get("services", {})
# for backwards compatibility
if "cloudwatch" in services:
services["cloudwatch_alarms"] = services["cloudwatch"]
del services["cloudwatch"]
if services:
args.append("--services")
# We need to sort the inner services-as-a-dict-params
# in order to create reliable tests
args += sorted(services)
args += _get_services_config(services)
if "requests" in services.get("s3", {}):
args += ["--s3-requests"]
alarms = services.get("cloudwatch_alarms", {}).get("alarms")
if alarms:
# {'alarms': 'all'} is handled by no additionally specified names
args += ["--cloudwatch-alarms"]
if isinstance(alarms, tuple):
args += alarms[1]
if "cloudfront" in services.get("wafv2", {}):
args += ["--wafv2-cloudfront"]
if "cloudfront" in global_services:
cloudfront_host_assignment = global_services["cloudfront"]["host_assignment"]
args += ["--cloudfront-host-assignment", cloudfront_host_assignment]
# '--overall-tags': [('KEY_1', ['VAL_1', 'VAL_2']), ...)],
args += _get_tag_options(params.get("overall_tags", []), "overall")
args += [
"--hostname",
hostname,
]
args.extend(("--piggyback-naming-convention", params["piggyback_naming_convention"]))
return args
special_agent_info["aws"] = agent_aws_arguments