From cf9f77e746196f95afdae87e0773d23d79d0b285 Mon Sep 17 00:00:00 2001 From: jburchard Date: Thu, 12 Mar 2020 11:44:23 -0500 Subject: [PATCH] MySQL Connector: Query (#176) * Initial push. * Add to docs. * Clean up. * Remove comment. --- docs/databases.rst | 7 ++ parsons/__init__.py | 4 +- parsons/databases/mysql/__init__.py | 5 + parsons/databases/mysql/mysql.py | 187 ++++++++++++++++++++++++++++ requirements.txt | 1 + test/test_databases/test_mysql.py | 49 ++++++++ 6 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 parsons/databases/mysql/__init__.py create mode 100644 parsons/databases/mysql/mysql.py create mode 100644 test/test_databases/test_mysql.py diff --git a/docs/databases.rst b/docs/databases.rst index 12959d5b8..92f754469 100644 --- a/docs/databases.rst +++ b/docs/databases.rst @@ -7,6 +7,13 @@ Google BigQuery See :doc:`google_cloud` for documentation. +***** +MySQL +***** + +.. autoclass:: parsons.MySQL + :inherited-members: + ******** Postgres ******** diff --git a/parsons/__init__.py b/parsons/__init__.py index 91673c1ff..db06ad2e4 100644 --- a/parsons/__init__.py +++ b/parsons/__init__.py @@ -38,6 +38,7 @@ from parsons.databases.postgres.postgres import Postgres from parsons.freshdesk.freshdesk import Freshdesk from parsons.bill_com.bill_com import BillCom + from parsons.databases.mysql.mysql import MySQL __all__ = [ 'VAN', @@ -69,7 +70,8 @@ 'Salesforce', 'Postgres', 'Freshdesk', - 'BillCom' + 'BillCom', + 'MySQL' ] # Define the default logging config for Parsons and its submodules. For now the diff --git a/parsons/databases/mysql/__init__.py b/parsons/databases/mysql/__init__.py new file mode 100644 index 000000000..b090a41f7 --- /dev/null +++ b/parsons/databases/mysql/__init__.py @@ -0,0 +1,5 @@ +from parsons.databases.mysql.mysql import MySQL + +__all__ = [ + 'MySQL' +] diff --git a/parsons/databases/mysql/mysql.py b/parsons/databases/mysql/mysql.py new file mode 100644 index 000000000..e96a5dc5e --- /dev/null +++ b/parsons/databases/mysql/mysql.py @@ -0,0 +1,187 @@ +from parsons import Table +from parsons.utilities import check_env +import petl +import mysql.connector as mysql +from contextlib import contextmanager +from parsons.utilities import files +import pickle +import logging +import os + +# Max number of rows that we query at a time, so we can avoid loading huge +# data sets into memory. +# 100k rows per batch at ~1k bytes each = ~100MB per batch. +QUERY_BATCH_SIZE = 100000 + +logger = logging.getLogger(__name__) + + +class MySQL(): + """ + Connect to a MySQL database. + + `Args:` + username: str + Required if env variable ``MYSQL_USERNAME`` not populated + password: str + Required if env variable ``MYSQL_PASSWORD`` not populated + host: str + Required if env variable ``MYSQL_HOST`` not populated + db: str + Required if env variable ``MYSQL_DB`` not populated + port: int + Can be set by env variable ``MYSQL_PORT`` or argument. + """ + + def __init__(self, host=None, username=None, password=None, db=None, port=3306): + + self.username = check_env.check('MYSQL_USERNAME', username) + self.password = check_env.check('MYSQL_PASSWORD', password) + self.host = check_env.check('MYSQL_HOST', host) + self.db = check_env.check('MYSQL_DB', db) + self.port = port or os.environ.get('MYSQL_PORT') + + @contextmanager + def connection(self): + """ + Generate a MySQL connection. The connection is set up as a python "context manager", so + it will be closed automatically (and all queries committed) when the connection goes out + of scope. + + When using the connection, make sure to put it in a ``with`` block (necessary for + any context manager): + ``with mysql.connection() as conn:`` + + `Returns:` + MySQL `connection` object + """ + + # Create a mysql connection and cursor + connection = mysql.connect(host=self.host, + user=self.username, + passwd=self.password, + database=self.db, + port=self.port) + + try: + yield connection + except mysql.Error: + connection.rollback() + raise + else: + connection.commit() + finally: + connection.close() + + @contextmanager + def cursor(self, connection): + cur = connection.cursor(buffered=True) + + try: + yield cur + finally: + cur.close() + + def query(self, sql, parameters=None): + """ + Execute a query against the database. Will return ``None``if the query returns zero rows. + + To include python variables in your query, it is recommended to pass them as parameters, + following the `mysql style `_. + Using the ``parameters`` argument ensures that values are escaped properly, and avoids SQL + injection attacks. + + **Parameter Examples** + + .. code-block:: python + + # Note that the name contains a quote, which could break your query if not escaped + # properly. + name = "Beatrice O'Brady" + sql = "SELECT * FROM my_table WHERE name = %s" + mysql.query(sql, parameters=[name]) + + .. code-block:: python + + names = ["Allen Smith", "Beatrice O'Brady", "Cathy Thompson"] + placeholders = ', '.join('%s' for item in names) + sql = f"SELECT * FROM my_table WHERE name IN ({placeholders})" + mysql.query(sql, parameters=names) + + `Args:` + sql: str + A valid SQL statement + parameters: list + A list of python variables to be converted into SQL values in your query + + `Returns:` + Parsons Table + See :ref:`parsons-table` for output options. + + """ # noqa: E501 + + with self.connection() as connection: + return self.query_with_connection(sql, connection, parameters=parameters) + + def query_with_connection(self, sql, connection, parameters=None, commit=True): + """ + Execute a query against the database, with an existing connection. Useful for batching + queries together. Will return ``None`` if the query returns zero rows. + + `Args:` + sql: str + A valid SQL statement + connection: obj + A connection object obtained from ``mysql.connection()`` + parameters: list + A list of python variables to be converted into SQL values in your query + commit: boolean + Whether to commit the transaction immediately. If ``False`` the transaction will + be committed when the connection goes out of scope and is closed (or you can + commit manually with ``connection.commit()``). + + `Returns:` + Parsons Table + See :ref:`parsons-table` for output options. + """ + with self.cursor(connection) as cursor: + + # The python connector can only execute a single sql statement, so we will + # break up each statement and execute them separately. + for s in sql.strip().split(';'): + if len(s) != 0: + logger.debug(f'SQL Query: {sql}') + cursor.execute(s, parameters) + + if commit: + connection.commit() + + # If the SQL query provides no response, then return None + if not cursor.description: + logger.debug('Query returned 0 rows') + return None + + else: + # Fetch the data in batches, and "pickle" the rows to a temp file. + # (We pickle rather than writing to, say, a CSV, so that we maintain + # all the type information for each field.) + temp_file = files.create_temp_file() + + with open(temp_file, 'wb') as f: + # Grab the header + pickle.dump(cursor.column_names, f) + + while True: + batch = cursor.fetchmany(QUERY_BATCH_SIZE) + if len(batch) == 0: + break + + logger.debug(f'Fetched {len(batch)} rows.') + for row in batch: + pickle.dump(row, f) + + # Load a Table from the file + final_tbl = Table(petl.frompickle(temp_file)) + + logger.debug(f'Query returned {final_tbl.num_rows} rows.') + return final_tbl diff --git a/requirements.txt b/requirements.txt index c15d9ef4d..c8764b24b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,6 +28,7 @@ simplejson==3.16.0 twilio==6.30.0 simple-salesforce==0.74.3 suds-py3==1.3.4.0 +mysql-connector-python==8.0.19 # Testing Requirements requests-mock==1.5.2 diff --git a/test/test_databases/test_mysql.py b/test/test_databases/test_mysql.py new file mode 100644 index 000000000..e17446f10 --- /dev/null +++ b/test/test_databases/test_mysql.py @@ -0,0 +1,49 @@ +from parsons.databases.mysql.mysql import MySQL +from parsons.etl.table import Table +from test.utils import assert_matching_tables +import unittest +import os + +# The name of the schema and will be temporarily created for the tests +TEMP_SCHEMA = 'parsons_test' + + +# These tests interact directly with the Postgres database. To run, set env variable "LIVE_TEST=True" +@unittest.skipIf(not os.environ.get('LIVE_TEST'), 'Skipping because not running live test') +class TestMySQL(unittest.TestCase): + + def setUp(self): + + self.temp_schema = TEMP_SCHEMA + self.mysql = MySQL() + + def tearDown(self): + + # Drop the view, the table and the schema + sql = f"DROP TABLE IF EXISTS test;" + self.mysql.query(sql) + + def test_query(self): + + # Check that query sending back expected result + r = self.mysql.query("SELECT 1") + self.assertEqual(r.first, 1) + + def test_query_no_response(self): + + # Check that a query that has no response and doesn't fail + sql = f"CREATE TABLE test (name VARCHAR(255), user_name VARCHAR(255))" + r = self.mysql.query(sql) + self.assertEqual(r, None) + + def test_insert_data(self): + + sql = "CREATE TABLE test (name VARCHAR(255), user_name VARCHAR(255));" + self.mysql.query(sql) + + sql = "INSERT INTO test (name, user_name) VALUES ('me', 'myuser');" + self.mysql.query(sql) + + r = self.mysql.query("select * from test") + + assert_matching_tables(Table([{'name': 'me', 'user_name': 'myuser'}]), r) \ No newline at end of file