Skip to content

Commit

Permalink
Fix a bug when applying pipeline update
Browse files Browse the repository at this point in the history
The updated IR should be stored as a property, not custom property

PiperOrigin-RevId: 394614056
  • Loading branch information
goutham authored and tfx-copybara committed Sep 3, 2021
1 parent 53c5c6c commit 7c2e78f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tfx/orchestration/experimental/core/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def apply_pipeline_update(self) -> None:
code=status_lib.Code.INVALID_ARGUMENT,
message='No updated pipeline IR to apply')
data_types_utils.set_metadata_value(
self._execution.custom_properties[_PIPELINE_IR], updated_pipeline_ir)
self._execution.properties[_PIPELINE_IR], updated_pipeline_ir)
del self._execution.custom_properties[_UPDATED_PIPELINE_IR]
self.pipeline = _base64_decode_pipeline(updated_pipeline_ir)

Expand Down
13 changes: 12 additions & 1 deletion tfx/orchestration/experimental/core/pipeline_state_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,28 @@ def test_update_initiation_and_apply(self):
with self._mlmd_connection as m:
pipeline = _test_pipeline('pipeline1', param=1)
updated_pipeline = _test_pipeline('pipeline1', param=2)

# Initiate pipeline update.
with pstate.PipelineState.new(m, pipeline) as pipeline_state:
self.assertFalse(pipeline_state.is_update_initiated())
pipeline_state.initiate_update(updated_pipeline)
self.assertTrue(pipeline_state.is_update_initiated())

# Reload from MLMD and verify.
# Reload from MLMD and verify update initiation followed by applying the
# pipeline update.
with pstate.PipelineState.load(
m, task_lib.PipelineUid.from_pipeline(pipeline)) as pipeline_state:
self.assertTrue(pipeline_state.is_update_initiated())
self.assertEqual(pipeline, pipeline_state.pipeline)
pipeline_state.apply_pipeline_update()
# Verify in-memory state after update application.
self.assertFalse(pipeline_state.is_update_initiated())
self.assertTrue(pipeline_state.is_active())
self.assertEqual(updated_pipeline, pipeline_state.pipeline)

# Reload from MLMD and verify update application was correctly persisted.
with pstate.PipelineState.load(
m, task_lib.PipelineUid.from_pipeline(pipeline)) as pipeline_state:
self.assertFalse(pipeline_state.is_update_initiated())
self.assertTrue(pipeline_state.is_active())
self.assertEqual(updated_pipeline, pipeline_state.pipeline)
Expand Down

0 comments on commit 7c2e78f

Please sign in to comment.