Skip to content

Commit

Permalink
(incremental-meta-2) Introduce layer of serializable classes for conf…
Browse files Browse the repository at this point in the history
…ig type system

Summary:
This introduces a layer of serializable classes that can
capture all the information necessary to rehydrate and return results
for a graphql query.

The most tricky part of the this is the treatement of generics (Set,
List, Optional, and Tuple). I added some helpers in the core config type
system to clarify verbiage. I classify them as "closed generic types".

We want to store the types in a normalized form, but each instance
of the generic type is not a normalized type instance. So we have
a notion of inner types, that is a list of all the contained types
within a given type (e.g. field types for Dicts and type params
for closed generics) but recursively until the algo hits a non-generic
types which are considered leaves.

This makes it so that with a dictionary lookup by key, and all of the
inner types, one can, for example, accurately render a type.

Test Plan: BK

Reviewers: max, alangenfeld

Reviewed By: max

Differential Revision: https://dagster.phacility.com/D1469
  • Loading branch information
schrockn committed Nov 23, 2019
1 parent 198beca commit 676fe7b
Show file tree
Hide file tree
Showing 5 changed files with 505 additions and 10 deletions.
21 changes: 21 additions & 0 deletions python_modules/dagster/dagster/core/meta/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'''
This module contains serializable classes that contain all the meta information
in our definitions and type systems. The purpose to be able to power our graphql
interface without having access to all the definitions in and types in process.
This will have a number of uses, but the most immediately germane are:
1) Persist *historical* pipeline and repository structures. This
will enable, in the short term, for the user to be able to go to a historical
run and view the meta information at that point in time.
2) Cache the meta information about a repository when, for example, interacting
with a repository that is not in the same process but is instead resident in
a container. This way one does not have query the meta information about
a pipeline/repository in the critical path of a user interaction.
I suspect also that the ability to hash the entire construct of meta information
to identify the repository will also end up being quite useful.
-- schrockn 11-22-2019
'''
190 changes: 190 additions & 0 deletions python_modules/dagster/dagster/core/meta/config_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from collections import namedtuple

from dagster import check
from dagster.core.serdes import whitelist_for_serdes
from dagster.core.types.config import ConfigType, ConfigTypeKind
from dagster.core.types.field import Field


@whitelist_for_serdes
class NonGenericTypeRefMeta(namedtuple('_NonGenericTypeRefMeta', 'key')):
def __new__(cls, key):
return super(NonGenericTypeRefMeta, cls).__new__(cls, check.str_param(key, 'key'))


@whitelist_for_serdes
class ConfigTypeMeta(
namedtuple(
'_ConfigTypeMeta',
'kind key name description is_builtin is_system_config '
'type_param_refs ' # only valid for closed generics (Set, Tuple, List, Optional)
'enum_values ' # only valid for enums
'fields', # only valid for dicts and selectors
)
):
def __new__(
cls,
kind,
key,
name,
type_param_refs,
enum_values,
fields,
description,
is_builtin,
is_system_config,
):
return super(ConfigTypeMeta, cls).__new__(
cls,
kind=check.inst_param(kind, 'kind', ConfigTypeKind),
key=check.str_param(key, 'key'),
name=check.opt_str_param(name, 'name'),
type_param_refs=None
if type_param_refs is None
else check.list_param(type_param_refs, 'type_param_refs', of_type=TypeRef),
enum_values=None
if enum_values is None
else check.list_param(enum_values, 'enum_values', of_type=ConfigEnumValueMeta),
fields=None
if fields is None
else check.list_param(fields, 'field', of_type=ConfigFieldMeta),
description=check.opt_str_param(description, 'description'),
is_builtin=check.bool_param(is_builtin, 'is_builtin'),
is_system_config=check.bool_param(is_system_config, 'is_system_config'),
)

@property
def inner_type_refs(self):
'''
This recurses through the type references with non-generic types as leaves.
'''

def _doit():
next_level_refs = _get_next_level_refs(self)
if next_level_refs:
for next_level in next_level_refs:
for inner_ref in _recurse_through_generics(next_level):
yield inner_ref

# there might be duplicate keys (esp for scalars)
refs_by_key = {}

for ref in _doit():
if ref.key not in refs_by_key:
refs_by_key[ref.key] = ref

return list(refs_by_key.values())


# This function is used by the recursive descent
# through all the inner types. This does *not*
# recursively descend through the type parameters
# of generic types. It just gets the next level of
# types. Either the direct type parameters of a
# generic type. Or the type refs of all the fields
# if it is a type with fields.
def _get_next_level_refs(ref):
# if a generic type, get type params
# if a type with fields, get refs of the fields
if ConfigTypeKind.is_closed_generic(ref.kind):
return ref.type_param_refs
elif (
ConfigTypeKind.has_fields(ref.kind) and ref.fields
): # still check fields because permissive
return [field_meta.type_ref for field_meta in ref.fields]


def _recurse_through_generics(ref):
yield ref
if isinstance(ref, ConfigTypeMeta) and ConfigTypeKind.is_closed_generic(ref.kind):
for type_param_ref in ref.type_param_refs:
for inner_ref in _recurse_through_generics(type_param_ref):
yield inner_ref


# A type reference in these serializable data structures are one of two things
# 1) A closed generic type (e.g. List[Int] of Optional[Set[str]])
# 2) Or a reference to a non-generic type, such as Dict, Selector, or a Scalar.
# Upon deserialization and when hydrated back to the graphql query, it will
# be the responsibility of that module to maintain a dictionary of the
# non-generic types and then do lookups into the dictionary in order to
# to explode the entire type hierarchy requested by the client

TypeRef = (ConfigTypeMeta, NonGenericTypeRefMeta)


@whitelist_for_serdes
class ConfigEnumValueMeta(namedtuple('_ConfigEnumValueMeta', 'value description')):
def __new__(cls, value, description):
return super(ConfigEnumValueMeta, cls).__new__(
cls,
value=check.str_param(value, 'value'),
description=check.opt_str_param(description, 'description'),
)


@whitelist_for_serdes
class ConfigFieldMeta(
namedtuple(
'_ConfigFieldMeta',
'name type_ref is_optional default_provided default_value_as_str description',
)
):
def __new__(
cls, name, type_ref, is_optional, default_provided, default_value_as_str, description
):
return super(ConfigFieldMeta, cls).__new__(
cls,
name=check.opt_str_param(name, 'name'),
type_ref=check.inst_param(type_ref, 'type_ref', TypeRef),
is_optional=check.bool_param(is_optional, 'is_optional'),
default_provided=check.bool_param(default_provided, 'default_provided'),
default_value_as_str=check.opt_str_param(default_value_as_str, 'default_value_as_str'),
description=check.opt_str_param(description, 'description'),
)


def meta_from_field(name, field):
check.str_param(name, 'name')
check.inst_param(field, 'field', Field)
return ConfigFieldMeta(
name=name,
type_ref=type_ref_of(field.config_type),
is_optional=field.is_optional,
default_provided=field.default_provided,
default_value_as_str=field.default_value_as_str if field.default_provided else None,
description=field.description,
)


def type_ref_of(config_type):
check.inst_param(config_type, 'config_type', ConfigType)
if ConfigTypeKind.is_closed_generic(config_type.kind):
return meta_from_config_type(config_type)
else:
return NonGenericTypeRefMeta(key=config_type.key)


def type_refs_of(type_list):
return list(map(type_ref_of, type_list)) if type_list is not None else None


def meta_from_config_type(config_type):
check.inst_param(config_type, 'config_type', ConfigType)
return ConfigTypeMeta(
key=config_type.key,
name=config_type.name,
kind=config_type.kind,
description=config_type.description,
is_builtin=config_type.is_builtin,
is_system_config=config_type.is_system_config,
type_param_refs=type_refs_of(config_type.type_params),
enum_values=[
ConfigEnumValueMeta(ev.config_value, ev.description) for ev in config_type.enum_values
]
if config_type.is_enum
else None,
fields=[meta_from_field(name, field) for name, field in config_type.fields.items()]
if config_type.is_selector or config_type.is_composite
else None,
)
72 changes: 62 additions & 10 deletions python_modules/dagster/dagster/core/types/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,29 @@ class ConfigTypeKind(PythonEnum):
DICT = 'DICT'
PERMISSIVE_DICT = 'PERMISSIVE_DICT'

@staticmethod
def has_fields(kind):
return (
kind == ConfigTypeKind.SELECTOR
or kind == ConfigTypeKind.DICT
or kind == ConfigTypeKind.PERMISSIVE_DICT
)

# Closed generic types
LIST = 'LIST'
NULLABLE = 'NULLABLE'
SET = 'SET'
TUPLE = 'TUPLE'

@staticmethod
def is_closed_generic(kind):
return (
kind == ConfigTypeKind.LIST
or kind == ConfigTypeKind.NULLABLE
or kind == ConfigTypeKind.SET
or kind == ConfigTypeKind.TUPLE
)


class ConfigTypeAttributes(namedtuple('_ConfigTypeAttributes', 'is_builtin is_system_config')):
def __new__(cls, is_builtin=False, is_system_config=False):
Expand All @@ -42,7 +60,15 @@ class ConfigType(object):
The class backing DagsterTypes as they are used processing configuration data.
'''

def __init__(self, key, name, kind, type_attributes=DEFAULT_TYPE_ATTRIBUTES, description=None):
def __init__(
self,
key,
name,
kind,
type_attributes=DEFAULT_TYPE_ATTRIBUTES,
description=None,
type_params=None,
):

type_obj = type(self)
if type_obj in ConfigType.__cache:
Expand All @@ -59,6 +85,11 @@ def __init__(self, key, name, kind, type_attributes=DEFAULT_TYPE_ATTRIBUTES, des
self.type_attributes = check.inst_param(
type_attributes, 'type_attributes', ConfigTypeAttributes
)
self.type_params = (
check.list_param(type_params, 'type_params', of_type=ConfigType)
if type_params
else None
)

__cache = {}

Expand All @@ -73,6 +104,12 @@ def from_builtin_enum(builtin_enum):
check.invariant(BuiltinEnum.contains(builtin_enum), 'param must be member of BuiltinEnum')
return _CONFIG_MAP[builtin_enum]

# An instantiated List, Tuple, Set, or Nullable
# e.g. List[Int] or Tuple[Int, Str]
@property
def is_closed_generic(self):
return ConfigTypeKind.is_closed_generic(self.kind)

@property
def is_system_config(self):
return self.type_attributes.is_system_config
Expand Down Expand Up @@ -123,7 +160,7 @@ def inner_types(self):

@property
def is_enum(self):
return self.kind == ConfigTypeKind.SET
return self.kind == ConfigTypeKind.ENUM

@property
def is_permissive_composite(self):
Expand All @@ -145,10 +182,19 @@ def is_config_scalar_valid(self, _config_value):
check.not_implemented('must implement')


class ConfigList(ConfigType):
class _ConfigClosedGeneric(ConfigType):
def __init__(self, type_params, **kwargs):
super(_ConfigClosedGeneric, self).__init__(
type_params=check.list_param(type_params, 'type_param', of_type=ConfigType), **kwargs
)


class ConfigList(_ConfigClosedGeneric):
def __init__(self, inner_type, **kwargs):
self.inner_type = check.inst_param(inner_type, 'inner_type', ConfigType)
super(ConfigList, self).__init__(kind=ConfigTypeKind.LIST, **kwargs)
super(ConfigList, self).__init__(
type_params=[inner_type], kind=ConfigTypeKind.LIST, **kwargs
)

@property
def inner_types(self):
Expand All @@ -158,7 +204,7 @@ def inner_types(self):
class ConfigSet(ConfigType):
def __init__(self, inner_type, **kwargs):
self.inner_type = check.inst_param(inner_type, 'inner_type', ConfigType)
super(ConfigSet, self).__init__(kind=ConfigTypeKind.SET, **kwargs)
super(ConfigSet, self).__init__(type_params=[inner_type], kind=ConfigTypeKind.SET, **kwargs)

@property
def inner_types(self):
Expand All @@ -167,20 +213,24 @@ def inner_types(self):

class ConfigTuple(ConfigType):
def __init__(self, tuple_types, **kwargs):
self.tuple_types = check.list_param(tuple_types, 'tuple_types', of_type=ConfigType)
super(ConfigTuple, self).__init__(kind=ConfigTypeKind.TUPLE, **kwargs)
self.tuple_types = tuple_types
super(ConfigTuple, self).__init__(
type_params=tuple_types, kind=ConfigTypeKind.TUPLE, **kwargs
)

@property
def inner_types(self):
return self.tuple_types + [
return self.type_params + [
inner_type for tuple_type in self.tuple_types for inner_type in tuple_type.inner_types
]


class ConfigNullable(ConfigType):
def __init__(self, inner_type, **kwargs):
self.inner_type = check.inst_param(inner_type, 'inner_type', ConfigType)
super(ConfigNullable, self).__init__(kind=ConfigTypeKind.NULLABLE, **kwargs)
super(ConfigNullable, self).__init__(
type_params=[inner_type], kind=ConfigTypeKind.NULLABLE, **kwargs
)

@property
def inner_types(self):
Expand Down Expand Up @@ -327,8 +377,10 @@ def Tuple(tuple_types):
class _Tuple(ConfigTuple):
def __init__(self):

# https://github.com/dagster-io/dagster/issues/1932
# TODO Naming these is a dubious decision
name = 'Tuple[{tuple_types}]'.format(
tuple_types=', '.join([tuple_type.name for tuple_type in tuple_types])
tuple_types=', '.join([tuple_type.key for tuple_type in tuple_types])
)

super(_Tuple, self).__init__(
Expand Down
Empty file.
Loading

0 comments on commit 676fe7b

Please sign in to comment.