Skip to content

Commit

Permalink
[cli] Add batch templates and fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
fallonchen committed Oct 1, 2020
1 parent 74ad31c commit c032b53
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
job_name: {{ klio.job_name }}
version: 2
pipeline_options:
streaming: False
project: {{ klio.pipeline_options.project }}
worker_harness_container_image: {{klio.pipeline_options.worker_harness_container_image}}
{%- if klio.pipeline_options.experiments %}
experiments:
{%- for experiment in klio.pipeline_options.experiments %}
- {{ experiment }}
{%- endfor %}
{%- endif %}
runner: DirectRunner
{%- if not klio.use_fnapi %}
setup_file: setup.py # relative to job dir
{%- endif %}
job_config:
allow_non_klio_messages: False
events:
inputs:
{%- for item in klio.job_options.inputs %}
- type: file
location: {{item.event_location}}
{%- endfor %}
outputs:
{%- for item in klio.job_options.outputs %}
- type: file
location: {{item.event_location}}
{%- endfor %}
data:
inputs:
{%- for item in klio.job_options.inputs %}
- type: file
location: {{item.data_location}}
# Remove/set to false when job is ready to process input existence checks
skip_klio_existence_check: True
{%- endfor %}
outputs:
{%- for item in klio.job_options.outputs %}
- type: file
location: {{item.data_location}}
# Remove/set to false when job is ready to process output existence checks
skip_klio_existence_check: True
{%- endfor %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Notice: the code below is just an example of what can be done.

Feel free to import what's needed, including third-party libraries or
other self-written modules.
"""
import apache_beam as beam

from klio.transforms import decorators


class HelloKlio(beam.DoFn):
"""A simple DoFn."""

@decorators.handle_klio
def process(self, data):
"""Main entrypoint to a transform.

Any errors raised here (explicitly or not), Klio will catch and
log + drop the Klio message.

For information on the Klio message, see
https://klio.readthedocs.io/en/latest/userguide/pipeline/message.html

For information on yielding other information other than ``data``, see
https://klio.readthedocs.io/en/latest/userguide/pipeline/state.html

Args:
data (KlioMessage.data): The data of the Klio message, which
contains two attributes: ``data.element`` and
``data.payload``.
Yields:
KlioMessage.data: the same Klio message data object received.
"""
element = data.element.decode("utf-8")
self._klio.logger.info(
"Received '%s' from file '%s'"
% (element, self._klio.config.job_config.events.inputs[0].file_pattern)
)
yield data

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
job_name: test-job
version: 2
pipeline_options:
streaming: False
experiments:
- beam_fn_api
worker_harness_container_image: "gcr.io/test-gcp-project/test-job-worker"
project: test-gcp-project
runner: DirectRunner
job_config:
allow_non_klio_messages: False
events:
inputs:
- type: file
location: test-job_input_elements.txt
outputs:
- type: file
location: test-job_output_elements
data:
inputs:
- type: file
location: test-job-input
# Remove/set to false when job is ready to process input existence checks
skip_klio_existence_check: True
outputs:
- type: file
location: test-job-output
# Remove/set to false when job is ready to process output existence checks
skip_klio_existence_check: True
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
job_name: test-job
version: 2
pipeline_options:
streaming: False
worker_harness_container_image: "gcr.io/test-gcp-project/test-job-worker"
project: test-gcp-project
runner: DirectRunner
setup_file: setup.py # relative to job root
job_config:
allow_non_klio_messages: False
events:
inputs:
- type: file
location: test-job_input_elements.txt
outputs:
- type: file
location: test-job_output_elements
data:
inputs:
- type: file
location: test-job-input
# Remove/set to false when job is ready to process input existence checks
skip_klio_existence_check: True
outputs:
- type: file
location: test-job-output
# Remove/set to false when job is ready to process output existence checks
skip_klio_existence_check: True
41 changes: 41 additions & 0 deletions cli/tests/commands/job/utils/fixtures/expected/transforms-batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Notice: the code below is just an example of what can be done.
Feel free to import what's needed, including third-party libraries or
other self-written modules.
"""
import apache_beam as beam

from klio.transforms import decorators


class HelloKlio(beam.DoFn):
"""A simple DoFn."""

@decorators.handle_klio
def process(self, data):
"""Main entrypoint to a transform.
Any errors raised here (explicitly or not), Klio will catch and
log + drop the Klio message.
For information on the Klio message, see
https://klio.readthedocs.io/en/latest/userguide/pipeline/message.html
For information on yielding other information other than ``data``, see
https://klio.readthedocs.io/en/latest/userguide/pipeline/state.html
Args:
data (KlioMessage.data): The data of the Klio message, which
contains two attributes: ``data.element`` and
``data.payload``.
Yields:
KlioMessage.data: the same Klio message data object received.
"""
element = data.element.decode("utf-8")
self._klio.logger.info(
"Received '%s' from file '%s'"
% (element, self._klio.config.job_config.events.inputs[0].file_pattern)
)
yield data

0 comments on commit c032b53

Please sign in to comment.