Skip to content

Commit

Permalink
DBSync: Create Table from DDL (move-coop#492)
Browse files Browse the repository at this point in the history
* dbsync: add retries and configurable chunking

This commit makes a few fixes to the Parsons DBSync class
to improve the performance and stability of the functionality.

 * Add a `retries` argument to the `DBSync` class to configure
 the number of times the sync will retry reading / writing data
 if there is a failure. This should make the sync process less
 brittle.
 * Add `read_chunk_size` and `write_chunk_size` arguments to
 the `DBSync` class to allow users to configure how much data is
 downloading from the source DB or written to the destination DB
 at a time.
 * Added a `strict_length` argument to the Redshift `copy_s3` and
 all DB `copy` functions that allows users to opt out or into of new
 functionality when creating tables to use a step function when
 determining what length to specify on a `varchar` column.

* PR comments

* Initial poc.

* Add in doc strings and Redshift, MySQL.

* Lint + Requirements.

* Skeleton tests.

* Lint.

* Lint.

Co-authored-by: Eliot Stone <[email protected]>
  • Loading branch information
jburchard and Eliot Stone authored Feb 24, 2021
1 parent 29e79a7 commit 94608e6
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 3 deletions.
80 changes: 80 additions & 0 deletions parsons/databases/alchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from sqlalchemy import create_engine, Table, MetaData
import logging

logger = logging.getLogger(__name__)


class Alchemy:

def generate_engine(self):
"""
Generate a SQL Alchemy engine.
"""

alchemy_url = self.generate_alchemy_url()
return create_engine(alchemy_url, echo=False, convert_unicode=True)

def generate_alchemy_url(self):
"""
Generate a SQL Alchemy engine
https://docs.sqlalchemy.org/en/14/core/engines.html#
"""

if self.dialect == 'redshift' or self.dialect == 'postgres':
connection_schema = 'postgresql+psycopg2'
elif self.dialect == 'mysql':
connection_schema = 'mysql+mysqlconnector'

params = [(self.username, self.username),
(self.password, f':{self.password}'),
(self.host, f'@{self.host}'),
(self.port, f':{self.port}'),
(self.db, f'/{self.db}')]

url = f'{connection_schema}://'

for i in params:
if i[0]:
url += i[1]

return url

def get_table_object(self, table_name):
"""
Get a SQL Alchemy table object.
"""

schema, table_name = self.split_table_name(table_name)
db_meta = MetaData(bind=self.generate_engine(), schema=schema)
return Table(table_name, db_meta, autoload=True)

def create_table(self, table_object, table_name):
"""
Create a table based on table object data.
"""

schema, table_name = self.split_table_name(table_name)

if schema:
table_object.schema = schema
if table_name:
table_object.table_name = table_name

table_object.metadata.create_all(self.generate_engine())

@staticmethod
def split_table_name(full_table_name):
"""
Utility method to parse the schema and table name.
"""

if "." not in full_table_name:
return "public", full_table_name

try:
schema, table = full_table_name.split(".")
except ValueError as e:
if "too many values to unpack" in str(e):
raise ValueError(f"Invalid database table {full_table_name}")

return schema, table
13 changes: 13 additions & 0 deletions parsons/databases/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def table_sync_full(self, source_table, destination_table, if_exists='drop',
logger.info('Source table contains 0 rows')
return None

# Create the table, if needed.
if not destination_tbl.exists:
self.create_table(source_table, destination_table)

copied_rows = self.copy_rows(source_table, destination_table, None,
order_by, **kwargs)

Expand Down Expand Up @@ -285,3 +289,12 @@ def _row_count_verify(self, source_table_obj, destination_table_obj):

logger.info('Source and destination table row counts match.')
return True

def create_table(self, source_table, destination_table):
"""
Create the empty table in the destination database based on the source
database schema structure. This method utilizes the Alchemy subclass.
"""

source_obj = self.source_db.get_table_object(source_table)
self.dest_db.create_table(source_obj, destination_table)
3 changes: 2 additions & 1 deletion parsons/databases/mysql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
from parsons.databases.table import BaseTable
from parsons.databases.mysql.create_table import MySQLCreateTable
from parsons.databases.alchemy import Alchemy

# Max number of rows that we query at a time, so we can avoid loading huge
# data sets into memory.
Expand All @@ -18,7 +19,7 @@
logger = logging.getLogger(__name__)


class MySQL(MySQLCreateTable):
class MySQL(MySQLCreateTable, Alchemy):
"""
Connect to a MySQL database.
Expand Down
3 changes: 2 additions & 1 deletion parsons/databases/postgres/postgres.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from parsons.databases.postgres.postgres_core import PostgresCore
from parsons.databases.table import BaseTable
from parsons.databases.alchemy import Alchemy
import logging
import os


logger = logging.getLogger(__name__)


class Postgres(PostgresCore):
class Postgres(PostgresCore, Alchemy):
"""
A Postgres class to connect to database. Credentials can be passed from a ``.pgpass`` file
stored in your home directory or with environmental variables.
Expand Down
4 changes: 3 additions & 1 deletion parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from parsons.databases.redshift.rs_table_utilities import RedshiftTableUtilities
from parsons.databases.redshift.rs_schema import RedshiftSchema
from parsons.databases.table import BaseTable
from parsons.databases.alchemy import Alchemy
from parsons.utilities import files
import psycopg2
import psycopg2.extras
Expand All @@ -24,7 +25,8 @@
logger = logging.getLogger(__name__)


class Redshift(RedshiftCreateTable, RedshiftCopyTable, RedshiftTableUtilities, RedshiftSchema):
class Redshift(RedshiftCreateTable, RedshiftCopyTable, RedshiftTableUtilities, RedshiftSchema,
Alchemy):
"""
A Redshift class to connect to database.
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ azure-storage-blob==12.3.2
PyGitHub==1.51
surveygizmo==1.2.3
PyJWT==2.0.1 # Otherwise `import jwt` would refer to python-jwt package
SQLAlchemy==1.3.23

# Testing Requirements
requests-mock==1.5.2
Expand Down
8 changes: 8 additions & 0 deletions test/test_databases/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def copy(self, data, table_name, **kwargs):

tbl.data.concat(data)

def get_table_object(self, table_name):

pass

def create_table(self, table_object, table_name):

pass


class FakeTable:
def __init__(self, table_name, data):
Expand Down

0 comments on commit 94608e6

Please sign in to comment.