Skip to content

Commit

Permalink
Merge pull request aws#562 from joguSD/generators
Browse files Browse the repository at this point in the history
Check comprehension generators in the analyzer
  • Loading branch information
joguSD authored Oct 13, 2017
2 parents 7f0b986 + eb242a7 commit 91ff9d2
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 35 deletions.
94 changes: 68 additions & 26 deletions chalice/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

APICallT = Dict[str, Set[str]]
OptASTSet = Optional[Set[ast.AST]]
ComprehensionNode = Union[ast.DictComp, ast.GeneratorExp, ast.ListComp]


def get_client_calls(source_code):
Expand Down Expand Up @@ -537,10 +538,7 @@ def visit_ClassDef(self, node):

def visit_DictComp(self, node):
# type: (ast.DictComp) -> None
# Not implemented yet. This creates a new scope,
# so we'd need to treat this similar to how we treat
# functions.
pass
self._handle_comprehension(node, 'dictcomp')

def visit_Return(self, node):
# type: (Any) -> None
Expand Down Expand Up @@ -569,33 +567,77 @@ def visit_GeneratorExp(self, node):
# with the name "genexpr".
self._handle_comprehension(node, 'genexpr')

def _handle_comprehension(self, node, comprehension_type):
# type: (Union[ast.ListComp, ast.GeneratorExp], str) -> None
child_scope = self._get_matching_sub_namespace(
comprehension_type, node.lineno)
def _visit_first_comprehension_generator(self, node):
# type: (ComprehensionNode) -> None
if node.generators:
# first generator's iterator is visited in the current scope
first_generator = node.generators[0]
self.visit(first_generator.iter)

def _collect_comprehension_children(self, node):
# type: (ComprehensionNode) -> List[ast.expr]
if isinstance(node, ast.DictComp):
# dict comprehensions have two values to be checked
child_nodes = [node.key, node.value]
else:
child_nodes = [node.elt]

if node.generators:
first_generator = node.generators[0]
child_nodes.append(first_generator.target)
for if_expr in first_generator.ifs:
child_nodes.append(if_expr)

for generator in node.generators[1:]:
# rest need to be visited in the child scope
child_nodes.append(generator.iter)
child_nodes.append(generator.target)
for if_expr in generator.ifs:
child_nodes.append(if_expr)
return child_nodes

def _visit_comprehension_children(self, node, comprehension_type):
# type: (ComprehensionNode, str) -> None
child_nodes = self._collect_comprehension_children(node)
child_scope = self._get_matching_sub_namespace(comprehension_type,
node.lineno)
if child_scope is None:
# If there's no child scope (listcomps in py2) then we can
# just analyze the node.elt node in the current scope instead
# of creating a new child scope.
self.visit(node.elt)
# In Python 2 there's no child scope for list comp
# Or we failed to locate the child scope, this happens in Python 2
# when there are multiple comprehensions of the same type in the
# same scope. The line number trick doesn't work as Python 2 always
# passes line number 0, make a best effort
for child_node in child_nodes:
try:
self.visit(child_node)
except KeyError:
pass
return
child_table = self._symbol_table.new_sub_table(child_scope)
child_infer = self._new_inference_scope(
ParsedCode(node.elt, child_table), self._binder, self._visited)
child_infer.bind_types()
for child_node in child_nodes:
# visit sub expressions in the child scope
child_table = self._symbol_table.new_sub_table(child_scope)
child_infer = self._new_inference_scope(
ParsedCode(child_node, child_table),
self._binder, self._visited)
child_infer.bind_types()

def _handle_comprehension(self, node, comprehension_type):
# type: (ComprehensionNode, str) -> None
self._visit_first_comprehension_generator(node)
self._visit_comprehension_children(node, comprehension_type)

def _get_matching_sub_namespace(self, name, lineno):
# type: (str, int) -> symtable.SymbolTable
namespaces = [
t for t in self._symbol_table.get_sub_namespaces()
if t.get_name() == name and t.get_lineno() == lineno]
if not namespaces:
return
# We're making a simplification and using the genexpr subnamespace.
# This has potential to miss a client call but we don't do
# inference on node.generators so this doesn't matter for now.
child_scope = namespaces[0]
return child_scope
namespaces = [t for t in self._symbol_table.get_sub_namespaces()
if t.get_name() == name]
if len(namespaces) == 1:
# if there's only one match for the name, return it
return namespaces[0]
for namespace in namespaces:
# otherwise disambiguate by using the line number
if namespace.get_lineno() == lineno:
return namespace
return None

def visit(self, node):
# type: (Any) -> None
Expand Down
62 changes: 53 additions & 9 deletions tests/unit/test_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,15 @@ def test_can_detect_calls_in_gen_expr():
""") == {'dynamodb': set(['list_tables'])}


def test_can_handle_gen_from_call():
assert aws_calls("""\
import boto3
service_name = 'dynamodb'
d = boto3.client('dynamodb')
(i for i in d.list_tables())
""") == {'dynamodb': set(['list_tables'])}


def test_can_detect_calls_in_multiple_gen_exprs():
assert aws_calls("""\
import boto3
Expand All @@ -493,6 +502,13 @@ def test_can_detect_calls_in_multiple_gen_exprs():
""") == {'dynamodb': set(['list_tables'])}


def test_multiple_gen_exprs():
assert aws_calls("""\
(i for i in [1,2,3])
(j for j in [1,2,3])
""") == {}


def test_can_handle_list_expr_with_api_calls():
assert aws_calls("""\
import boto3
Expand All @@ -506,9 +522,9 @@ def test_can_handle_multiple_listcomps():
bar_key = 'bar'
baz_key = 'baz'
items = [{'foo': 'sun', 'bar': 'moon', 'baz': 'stars'}]
foos = [t['foo'] for i in items]
bars = [t[bar_key] for t in items]
bazs = [t[baz_key] for t in items]
foos = [i['foo'] for i in items]
bars = [j[bar_key] for j in items]
bazs = [k[baz_key] for k in items]
""") == {}


Expand Down Expand Up @@ -566,12 +582,40 @@ def foo():
""") == {'s3': set(['list_buckets']),
'ec2': set(['describe_instances'])}

# def test_can_handle_dict_comp():
# assert aws_calls("""\
# import boto3
# ddb = boto3.client('dynamodb')
# tables = {t: t for t in ddb.list_tables()}
# """) == {'dynamodb': set(['list_tables'])}

def test_can_handle_dict_comp():
assert aws_calls("""\
import boto3
ddb = boto3.client('dynamodb')
tables = {t: t for t in ddb.list_tables()}
""") == {'dynamodb': set(['list_tables'])}


def test_can_handle_dict_comp_if():
assert aws_calls("""\
import boto3
ddb = boto3.client('dynamodb')
tables = {t: t for t in [1] if ddb.list_tables()}
""") == {'dynamodb': set(['list_tables'])}


def test_can_handle_comp_ifs():
assert aws_calls("""\
[(x,y) for x in [1,2,3,4] for y in [1,2,3,4] if x % 2 == 0]
""") == {}


def test_can_handle_dict_comp_ifs():
assert aws_calls("""\
import boto3
d = boto3.client('dynamodb')
{x: y for x in d.create_table()\
for y in d.update_table()\
if d.list_tables()}
{x: y for x in d.create_table()\
for y in d.update_table()\
if d.list_tables()}
""") == {'dynamodb': set(['list_tables', 'create_table', 'update_table'])}


# def test_tuple_assignment():
Expand Down

0 comments on commit 91ff9d2

Please sign in to comment.