forked from aws-cloudformation/cfn-lint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DynamicReferenceSecureString.py
147 lines (126 loc) · 5.72 KB
/
DynamicReferenceSecureString.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import regex as re
import cfnlint.helpers
from cfnlint.rules import CloudFormationLintRule, RuleMatch
class DynamicReferenceSecureString(CloudFormationLintRule):
"""Check if Dynamic Reference Secure Strings are only used in the correct locations"""
id = "E1027"
shortdesc = "Check dynamic references secure strings are in supported locations"
description = (
"Dynamic References Secure Strings are only supported for a small set of resource properties. "
"Validate that they are being used in the correct location when checking values "
"and Fn::Sub in resource properties. Currently doesn't check outputs, maps, conditions, "
"parameters, and descriptions."
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html"
tags = ["functions", "dynamic reference"]
def __init__(self):
"""Init"""
super().__init__()
self.property_specs = []
self.resource_specs = []
self.exceptions = {
"AWS::DirectoryService::MicrosoftAD": "Password",
"AWS::DirectoryService::SimpleAD": "Password",
"AWS::ElastiCache::ReplicationGroup": "AuthToken",
"AWS::IAM::User.LoginProfile": "Password",
"AWS::KinesisFirehose::DeliveryStream.RedshiftDestinationConfiguration": "Password",
"AWS::OpsWorks::App.Source": "Password",
"AWS::OpsWorks::Stack.RdsDbInstance": "DbPassword",
"AWS::OpsWorks::Stack.Source": "Password",
"AWS::RDS::DBCluster": "MasterUserPassword",
"AWS::RDS::DBInstance": "MasterUserPassword",
"AWS::Redshift::Cluster": "MasterUserPassword",
}
def initialize(self, cfn):
"""Init"""
specs = cfnlint.helpers.RESOURCE_SPECS.get(cfn.regions[0])
self.property_specs = specs.get("PropertyTypes")
self.resource_specs = specs.get("ResourceTypes")
for resource_spec in self.resource_specs:
self.resource_property_types.append(resource_spec)
for property_spec in self.property_specs:
self.resource_sub_property_types.append(property_spec)
def check_dyn_ref_value(self, value, path):
"""Chec item type"""
matches = []
if isinstance(value, str):
if re.match(cfnlint.helpers.REGEX_DYN_REF_SSM_SECURE, value):
message = f'Dynamic Reference secure strings are not supported for this property at {"/".join(map(str, path[:]))}'
matches.append(RuleMatch(path[:], message))
return matches
def check_value(self, value, path, **kwargs):
"""Check Value"""
matches = []
item_type = kwargs.get("item_type", {})
if item_type in ["Map"]:
if isinstance(value, dict):
for map_key, map_value in value.items():
if not isinstance(map_value, dict):
matches.extend(
self.check_dyn_ref_value(map_value, path[:] + [map_key])
)
else:
matches.extend(self.check_dyn_ref_value(value, path[:]))
return matches
# pylint: disable=W0613
# Need to disable for the function to work
def check_sub(self, value, path, **kwargs):
"""Check Sub Function Dynamic References"""
matches = []
if isinstance(value, list):
if isinstance(value[0], str):
matches.extend(self.check_dyn_ref_value(value[0], path[:] + [0]))
else:
matches.extend(self.check_dyn_ref_value(value, path[:]))
return matches
def check(self, cfn, properties, specs, property_type, path):
"""Check itself"""
matches = []
for prop in properties:
if prop in specs:
if property_type in self.exceptions:
if prop == self.exceptions.get(property_type):
continue
primitive_type = specs.get(prop).get("PrimitiveType")
if not primitive_type:
primitive_type = specs.get(prop).get("PrimitiveItemType")
if specs.get(prop).get("Type") in ["List", "Map"]:
item_type = specs.get(prop).get("Type")
else:
item_type = None
if primitive_type:
matches.extend(
cfn.check_value(
properties,
prop,
path[:],
check_value=self.check_value,
check_sub=self.check_sub,
primitive_type=primitive_type,
item_type=item_type,
)
)
return matches
def match_resource_sub_properties(self, properties, property_type, path, cfn):
"""Match for sub properties"""
matches = []
if self.property_specs.get(property_type, {}).get("Properties"):
property_specs = self.property_specs.get(property_type, {}).get(
"Properties", {}
)
matches.extend(
self.check(cfn, properties, property_specs, property_type, path)
)
return matches
def match_resource_properties(self, properties, resource_type, path, cfn):
"""Check CloudFormation Properties"""
matches = []
resource_specs = self.resource_specs.get(resource_type, {}).get(
"Properties", {}
)
matches.extend(self.check(cfn, properties, resource_specs, resource_type, path))
return matches