Skip to content

Commit

Permalink
launch dockerized postgres in a common/reusable way
Browse files Browse the repository at this point in the history
Summary: Extend D3713 to launch docker on local dev env.

Test Plan: unit / bk

Reviewers: sashank

Reviewed By: sashank

Differential Revision: https://dagster.phacility.com/D3779
  • Loading branch information
ajnadel committed Jul 7, 2020
1 parent 8048b79 commit a50f780
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .buildkite/hooks/pre-exit
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ docker-compose stop
docker-compose rm -f
popd

pushd python_modules/dagster-graphql/dagster_graphql_tests/graphql
docker-compose stop
docker-compose rm -f
popd

pushd python_modules/libraries/dagster-postgres/dagster_postgres_tests/
docker-compose stop
docker-compose rm -f
Expand Down
16 changes: 15 additions & 1 deletion .buildkite/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,20 @@ def postgres_extra_cmds_fn(_):
]


def graphql_pg_extra_cmds_fn(_):
return [
"pushd python_modules/dagster-graphql/dagster_graphql_tests/graphql/",
"docker-compose up -d --remove-orphans", # clean up in hooks/pre-exit,
# Can't use host networking on buildkite and communicate via localhost
# between these sibling containers, so pass along the ip.
network_buildkite_container('postgres'),
connect_sibling_docker_container(
'postgres', 'test-postgres-db-graphql', 'POSTGRES_TEST_DB_HOST'
),
"popd",
]


# Some Dagster packages have more involved test configs or support only certain Python version;
# special-case those here
DAGSTER_PACKAGES_WITH_CUSTOM_TESTS = [
Expand Down Expand Up @@ -234,7 +248,7 @@ def postgres_extra_cmds_fn(_):
),
ModuleBuildSpec(
'python_modules/dagster-graphql',
extra_cmds_fn=postgres_extra_cmds_fn,
extra_cmds_fn=graphql_pg_extra_cmds_fn,
tox_file='tox_postgres.ini',
buildkite_label='dagster-graphql-postgres',
tox_env_suffixes=[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version: "3.7"

services:
test-postgres-db-graphql:
image: postgres:11
container_name: test-postgres-db-graphql
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: "test"
POSTGRES_USER: "test"
POSTGRES_DB: "test"
networks:
- postgres

networks:
postgres:
driver: bridge
name: postgres
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ def readonly_postgres_instance():
@contextmanager
def _readonly_postgres_instance():
with seven.TemporaryDirectory() as temp_dir:
yield DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=ExplodingRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)
with TestPostgresInstance.docker_service_up(
file_relative_path(__file__, 'docker-compose.yml'), 'test-postgres-db-graphql'
):
yield DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=ExplodingRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)

return MarkedManager(
_readonly_postgres_instance, [Marks.postgres_instance, Marks.readonly],
Expand Down Expand Up @@ -184,15 +187,18 @@ def postgres_instance_with_sync_run_launcher():
@contextmanager
def _postgres_instance():
with seven.TemporaryDirectory() as temp_dir:
yield DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=SyncInMemoryRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)
with TestPostgresInstance.docker_service_up(
file_relative_path(__file__, 'docker-compose.yml'), 'test-postgres-db-graphql'
):
yield DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=SyncInMemoryRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)

return MarkedManager(
_postgres_instance, [Marks.postgres_instance, Marks.sync_run_launcher],
Expand All @@ -203,19 +209,22 @@ def postgres_instance_with_cli_api_run_launcher():
@contextmanager
def _postgres_instance_with_cli_api_hijack():
with seven.TemporaryDirectory() as temp_dir:
instance = DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=CliApiRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)
try:
yield instance
finally:
instance.run_launcher.join()
with TestPostgresInstance.docker_service_up(
file_relative_path(__file__, 'docker-compose.yml'), 'test-postgres-db-graphql'
):
instance = DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=LocalArtifactStorage(temp_dir),
run_storage=TestPostgresInstance.clean_run_storage(),
event_storage=TestPostgresInstance.clean_event_log_storage(),
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=CliApiRunLauncher(),
schedule_storage=TestPostgresInstance.clean_schedule_storage(),
)
try:
yield instance
finally:
instance.run_launcher.join()

return MarkedManager(
_postgres_instance_with_cli_api_hijack,
Expand Down
59 changes: 57 additions & 2 deletions python_modules/dagster/dagster/utils/test/postgres_instance.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import os
import subprocess
from contextlib import contextmanager

from dagster import check

BUILDKITE = bool(os.getenv('BUILDKITE'))


class TestPostgresInstance:
@staticmethod
Expand All @@ -12,8 +18,6 @@ def dagster_postgres_installed():

@staticmethod
def get_hostname(env_name='POSTGRES_TEST_DB_HOST'):
import os

# In buildkite we get the ip address from this variable (see buildkite code for commentary)
# Otherwise assume local development and assume localhost
return os.environ.get(env_name, 'localhost')
Expand Down Expand Up @@ -77,3 +81,54 @@ def clean_schedule_storage():
storage = PostgresScheduleStorage.create_clean_storage(TestPostgresInstance.conn_string())
assert storage
return storage

@staticmethod
@contextmanager
def docker_service_up(docker_compose_file, service_name):
check.invariant(
TestPostgresInstance.dagster_postgres_installed(),
'dagster_postgres must be installed to test with postgres',
)
check.str_param(service_name, 'service_name')
check.str_param(docker_compose_file, 'docker_compose_file')
check.invariant(
os.path.isfile(docker_compose_file), 'docker_compose_file must specify a valid file'
)

from dagster_postgres.utils import wait_for_connection # pylint: disable=import-error

if BUILDKITE:
yield TestPostgresInstance.conn_string() # buildkite docker is handled in pipeline setup
return

if not is_postgres_running(service_name):
try:
subprocess.check_output(
['docker-compose', '-f', docker_compose_file, 'stop', service_name]
)
subprocess.check_output(
['docker-compose', '-f', docker_compose_file, 'rm', '-f', service_name]
)
except Exception: # pylint: disable=broad-except
pass
subprocess.check_output(
['docker-compose', '-f', docker_compose_file, 'up', '-d', service_name]
)

conn_str = TestPostgresInstance.conn_string()
wait_for_connection(conn_str)
yield conn_str


def is_postgres_running(service_name):
check.str_param(service_name, 'service_name')
output = subprocess.check_output(
['docker', 'container', 'ps', '-f', 'name={}'.format(service_name), '-f', 'status=running',]
)
decoded = output.decode()

lines = decoded.split('\n')

# header, one line for container, trailing \n
# if container is found, service_name should appear in the second line of output
return (len(lines) == 3) and (service_name in lines[1])
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dagster_postgres.utils import get_conn_string, wait_for_connection

from dagster.utils import file_relative_path, pushd
from dagster.utils.test.postgres_instance import TestPostgresInstance

BUILDKITE = bool(os.getenv('BUILDKITE'))

Expand All @@ -28,22 +29,10 @@ def is_postgres_running():

@pytest.fixture(scope='function')
def postgres():
if BUILDKITE:
with TestPostgresInstance.docker_service_up(
file_relative_path(__file__, 'docker-compose.yml'), 'test-postgres-db'
):
yield
return

if not is_postgres_running():
with pushd(file_relative_path(__file__, '.')):
try:
subprocess.check_output(['docker-compose', 'stop', 'test-postgres-db'])
subprocess.check_output(['docker-compose', 'rm', '-f', 'test-postgres-db'])
except Exception: # pylint: disable=broad-except
pass
subprocess.check_output(['docker-compose', 'up', '-d', 'test-postgres-db'])

wait_for_connection(_conn_string())

yield


@pytest.fixture(scope='module')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from dagster import DagsterEventType, execute_pipeline, pipeline, seven, solid
from dagster.core.instance import DagsterInstance, InstanceType
from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher
from dagster.core.storage.local_compute_log_manager import LocalComputeLogManager
from dagster.core.storage.pipeline_run import PipelineRunStatus
from dagster.core.storage.root import LocalArtifactStorage
Expand Down Expand Up @@ -37,6 +38,7 @@ def test_postgres_instance(multi_postgres):
run_storage=run_storage,
event_storage=event_storage,
compute_log_manager=LocalComputeLogManager(temp_dir),
run_launcher=SyncInMemoryRunLauncher(),
)

result = execute_pipeline(simple, instance=instance)
Expand Down

0 comments on commit a50f780

Please sign in to comment.