Skip to content

Commit

Permalink
[lib] rewrite KlioReadFromBigQuery as reader + map transform
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Simon committed Nov 20, 2020
1 parent 0ba9b6e commit 3b0d2f2
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 57 deletions.
133 changes: 76 additions & 57 deletions lib/src/klio/transforms/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from apache_beam.io import avroio as beam_avroio
from apache_beam.io.gcp import bigquery as beam_bq
from apache_beam.io.gcp import bigquery_tools as beam_bq_tools

from klio_core.proto import klio_pb2

Expand Down Expand Up @@ -118,12 +117,15 @@ class KlioReadFromText(beam.io.ReadFromText, _KlioTransformMixin):
_source_class = _KlioReadFromTextSource


class _KlioBigQueryReader(beam_bq_tools.BigQueryReader):
def __init__(self, *args, klio_message_columns=None, **kwargs):
super(_KlioBigQueryReader, self).__init__(*args, **kwargs)
class _KlioReadFromBigQueryMapper(object):
"""Wrapper class to provide a ``beam.Map`` object that converts a row of a
``ReadFromBigQuery`` to a properly formatted ``KlioMessage``.
"""

def __init__(self, klio_message_columns=None):
self.__klio_message_columns = klio_message_columns

def __generate_klio_message(self):
def _generate_klio_message(self):
message = klio_pb2.KlioMessage()
message.version = klio_pb2.Version.V2
message.metadata.intended_recipients.anyone.SetInParent()
Expand All @@ -142,7 +144,7 @@ def __generate_klio_message(self):
# Potentially.
return message

def __iter__(self):
def _map_row_element(self, row):
# NOTE: this assumes that the coder being used (default is
# beam.io.gcp.bigquery_tools.RowAsDictJsonCoder, otherwise set in
# klio-job.yaml) is JSON serializable (since the default is just
Expand All @@ -151,47 +153,52 @@ def __iter__(self):
# NOTE: We need to have the row elements be bytes, so if it is
# a dictionary, we json.dumps into a str to convert to bytes,
# but that may need to change if we want to support other coders
for row in super(_KlioBigQueryReader, self).__iter__():
message = self.__generate_klio_message()
data = {}
if self.__klio_message_columns:
if len(self.__klio_message_columns) == 1:
data = row[self.__klio_message_columns[0]]

data = {}
if self.__klio_message_columns:
if len(self.__klio_message_columns) == 1:
data = row[self.__klio_message_columns[0]]
else:
for key, value in row.items():
if key in self.__klio_message_columns:
data[key] = value
data = json.dumps(data)

else:
for key, value in row.items():
if key in self.__klio_message_columns:
data[key] = value
data = json.dumps(data)
else:
data = json.dumps(row)
return data

else:
data = json.dumps(row)
def _map_row(self, row):
message = self._generate_klio_message()
message.data.element = bytes(self._map_row_element(row), "utf-8")
return message.SerializeToString()

message.data.element = bytes(data, "utf-8")
yield message.SerializeToString()
def as_beam_map(self):
return "Convert to KlioMessage" >> beam.Map(self._map_row)


# Note: copy-pasting the docstrings of `BigQuerySource` so that we can
# Note: copy-pasting the docstrings of `ReadFromBigQuery` so that we can
# include our added parameter (`klio_message_columns`) in the API
# documentation (via autodoc). If we don't do this, then just the parent
# documentation will be shown, excluding our new parameter.
class KlioReadFromBigQuery(beam_bq.BigQuerySource, _KlioTransformMixin):
"""Read from BigQuery with each row as a ``KlioMessage.data.element``.
class KlioReadFromBigQuery(beam.PTransform, _KlioTransformMixin):
"""Read data from BigQuery.
This PTransform uses a BigQuery export job to take a snapshot of the table
on GCS, and then reads from each produced file. File format is Avro by
default.
Args:
table (str): The ID of a BigQuery table. If specified all data of the
table will be used as input of the current source. The ID must contain
only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
``_``. If dataset and query arguments are :data:`None` then the table
argument must contain the entire table reference specified as:
``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
dataset (str): The ID of the dataset containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
argument.
project (str): The ID of the project containing this table.
klio_message_columns (list): A list of fields (``str``) that should
be assigned to ``KlioMessage.data.element``.
Expand All @@ -203,20 +210,18 @@ class KlioReadFromBigQuery(beam_bq.BigQuerySource, _KlioTransformMixin):
"field2": bar"}'``). If only one field is provided, just the
value will be assigned to ``KlioMessage.data.element``.
query (str): A query to be used instead of arguments table, dataset, and
project.
query (str, ValueProvider): A query to be used instead of arguments
table, dataset, and project.
validate (bool): If :data:`True`, various checks will be done when source
gets initialized (e.g., is table present?). This should be
:data:`True` for most scenarios in order to catch errors as early as
possible (pipeline construction instead of pipeline execution). It
should be :data:`False` if the table is created during pipeline
execution by a previous step.
coder (~apache_beam.coders.coders.Coder): The coder for the table
rows if serialized to disk. If :data:`None`, then the default coder is
:class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`,
which will interpret every line in a file as a JSON serialized
dictionary. This argument needs a value only in special cases when
returning table rows as dictionaries is not desirable.
rows. If :data:`None`, then the default coder is
_JsonToDictCoder, which will interpret every row as a JSON
serialized dictionary.
use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
dialect for this query. The default value is :data:`False`.
If set to :data:`True`, the query will use BigQuery's updated SQL
Expand All @@ -225,24 +230,38 @@ class KlioReadFromBigQuery(beam_bq.BigQuerySource, _KlioTransformMixin):
flatten_results (bool): Flattens all nested and repeated fields in the
query results. The default value is :data:`True`.
kms_key (str): Optional Cloud KMS key name for use when creating new
tables.
"""

_REQUIRES_IO_READ_WRAP = True
temporary tables.
gcs_location (str, ValueProvider): The name of the Google Cloud Storage
bucket where the extracted table should be written as a string or
a :class:`~apache_beam.options.value_provider.ValueProvider`. If
:data:`None`, then the temp_location parameter is used.
bigquery_job_labels (dict): A dictionary with string labels to be passed
to BigQuery export and query jobs created by this transform. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/\
Job#JobConfiguration
use_json_exports (bool): By default, this transform works by exporting
BigQuery data into Avro files, and reading those files. With this
parameter, the transform will instead export to JSON files. JSON files
are slower to read due to their larger size. When using JSON exports,
the BigQuery types for DATE, DATETIME, TIME, and TIMESTAMP will be
exported as strings. This behavior is consistent with BigQuerySource.
When using Avro exports, these fields will be exported as native Python
types (datetime.date, datetime.datetime, datetime.datetime,
and datetime.datetime respectively). Avro exports are recommended.
To learn more about BigQuery types, and Time-related type
representations, see: https://cloud.google.com/bigquery/docs/reference/\
standard-sql/data-types
To learn more about type conversions between BigQuery and Avro, see:
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
#avro_conversions
"""

def __init__(self, *args, klio_message_columns=None, **kwargs):
super(KlioReadFromBigQuery, self).__init__(*args, **kwargs)
self.__klio_message_columns = klio_message_columns
self.__reader = beam_bq.ReadFromBigQuery(*args, **kwargs)
self.__mapper = _KlioReadFromBigQueryMapper(klio_message_columns)

def reader(self, test_bigquery_client=None):
return _KlioBigQueryReader(
source=self,
test_bigquery_client=test_bigquery_client,
use_legacy_sql=self.use_legacy_sql,
flatten_results=self.flatten_results,
kms_key=self.kms_key,
klio_message_columns=self.__klio_message_columns,
)
def expand(self, pcoll):
return pcoll | self.__reader | self.__mapper.as_beam_map()


class KlioWriteToBigQuery(beam.io.WriteToBigQuery, _KlioTransformMixin):
Expand Down
35 changes: 35 additions & 0 deletions lib/tests/unit/transforms/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tempfile

import apache_beam as beam
import pytest

from apache_beam.testing import test_pipeline

Expand Down Expand Up @@ -116,3 +117,37 @@ def test_read_from_avro():
)

assert io_transforms.KlioReadFromAvro._REQUIRES_IO_READ_WRAP is True


def test_bigquery_mapper_generate_klio_message():

mapper = io_transforms._KlioReadFromBigQueryMapper()
message = mapper._generate_klio_message()

assert message.version == klio_pb2.Version.V2
assert (
message.metadata.intended_recipients.WhichOneof("recipients")
== "anyone"
)


@pytest.mark.parametrize(
"klio_message_columns,row,expected",
(
(["one_column"], {"a": "A", "b": "B", "one_column": "value"}, "value"),
(
["a", "b"],
{"a": "A", "b": "B", "c": "C"},
json.dumps({"a": "A", "b": "B"}),
),
(None, {"a": "A", "b": "B"}, json.dumps({"a": "A", "b": "B"})),
),
)
def test_bigquery_mapper_map_row_element(klio_message_columns, row, expected):
mapper = io_transforms._KlioReadFromBigQueryMapper(
klio_message_columns=klio_message_columns
)

actual = mapper._map_row_element(row)

assert actual == expected

0 comments on commit 3b0d2f2

Please sign in to comment.