forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJoin.py
166 lines (146 loc) · 7.56 KB
/
Join.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from cfnlint.rules import CloudFormationLintRule
from cfnlint.rules import RuleMatch
from cfnlint.helpers import RESOURCE_SPECS, VALID_PARAMETER_TYPES_LIST
class Join(CloudFormationLintRule):
"""Check if Join values are correct"""
id = 'E1022'
shortdesc = 'Join validation of parameters'
description = 'Making sure the join function is properly configured'
source_url = 'https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-join.html'
tags = ['functions', 'join']
def __init__(self):
"""Initialize the rule"""
super(Join, self).__init__()
self.list_supported_functions = []
self.singular_supported_functions = []
for intrinsic_type, intrinsic_value in RESOURCE_SPECS.get('us-east-1').get('IntrinsicTypes').items():
if 'List' in intrinsic_value.get('ReturnTypes', []):
self.list_supported_functions.append(intrinsic_type)
if 'Singular' in intrinsic_value.get('ReturnTypes', []):
self.singular_supported_functions.append(intrinsic_type)
def _get_parameters(self, cfn):
"""Get all Parameter Names"""
results = {}
parameters = cfn.template.get('Parameters', {})
if isinstance(parameters, dict):
for param_name, param_values in parameters.items():
# This rule isn't here to check the Types but we need
# something valid if it doesn't exist
results[param_name] = param_values.get('Type', 'String')
return results
def _normalize_getatt(self, getatt):
""" Normalize getatt into an array"""
if isinstance(getatt, str):
return getatt.split('.', 1)
return getatt
def _is_ref_a_list(self, parameter, template_parameters):
""" Is a Ref a list """
list_params = [
'AWS::NotificationARNs',
]
if parameter in template_parameters:
if template_parameters.get(parameter) in VALID_PARAMETER_TYPES_LIST:
return True
if parameter in list_params:
return True
return False
def _is_getatt_a_list(self, parameter, get_atts):
""" Is a GetAtt a List """
for resource, attributes in get_atts.items():
for attribute_name, attribute_values in attributes.items():
if resource == parameter[0] and attribute_name == '*':
if attribute_values.get('PrimitiveItemType'):
return 'FALSE'
if attribute_values.get('Type') == 'List':
return 'TRUE'
return 'UNKNOWN'
if resource == parameter[0] and attribute_name == parameter[1]:
if attribute_values.get('Type') == 'List':
return 'TRUE'
return 'FALSE'
def _match_string_objs(self, join_string_objs, cfn, path):
""" Check join list """
matches = []
template_parameters = self._get_parameters(cfn)
get_atts = cfn.get_valid_getatts()
if isinstance(join_string_objs, dict):
if len(join_string_objs) == 1:
for key, value in join_string_objs.items():
if key not in self.list_supported_functions:
message = 'Fn::Join unsupported function for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif key in ['Ref']:
if not self._is_ref_a_list(value, template_parameters):
message = 'Fn::Join must use a list at {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif key in ['Fn::GetAtt']:
if self._is_getatt_a_list(self._normalize_getatt(value), get_atts) == 'FALSE':
message = 'Fn::Join must use a list at {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
else:
message = 'Join list of values should be singular for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif not isinstance(join_string_objs, list):
message = 'Join list of values for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
else:
for string_obj in join_string_objs:
if isinstance(string_obj, dict):
if len(string_obj) == 1:
for key, value in string_obj.items():
if key not in self.singular_supported_functions:
message = 'Join unsupported function for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif key in ['Ref']:
if self._is_ref_a_list(value, template_parameters):
message = 'Fn::Join must not be a list at {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif key in ['Fn::GetAtt']:
if self._is_getatt_a_list(self._normalize_getatt(value), get_atts) == 'TRUE':
message = 'Fn::Join must not be a list at {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
else:
message = 'Join list of values should be singular for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
elif not isinstance(string_obj, str):
message = 'Join list of singular function or string for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
return matches
def match(self, cfn):
matches = []
join_objs = cfn.search_deep_keys('Fn::Join')
for join_obj in join_objs:
join_value_obj = join_obj[-1]
path = join_obj[:-1]
if isinstance(join_value_obj, list):
if len(join_value_obj) == 2:
join_string = join_value_obj[0]
join_string_objs = join_value_obj[1]
if not isinstance(join_string, str):
message = 'Join string has to be of type string for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
matches.extend(self._match_string_objs(join_string_objs, cfn, path))
else:
message = 'Join should be an array of 2 for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
else:
message = 'Join should be an array of 2 for {0}'
matches.append(RuleMatch(
path, message.format('/'.join(map(str, path)))))
return matches