Skip to content

Commit 6d7ebce

Browse files
authored
Merge pull request datafold#329 from datafold/dec2_cleanup
Dec2 cleanup
2 parents b6b4618 + 3432649 commit 6d7ebce

23 files changed

+62
-102
lines changed

data_diff/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .table_segment import TableSegment
1010
from .utils import eval_name_template
1111

12+
1213
def connect_to_table(
1314
db_info: Union[str, dict],
1415
table_name: Union[DbPath, str],

data_diff/__main__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ def __init__(self, **kwargs):
8181
def write_usage(self, prog: str, args: str = "", prefix: Optional[str] = None) -> None:
8282
self.write(f"data-diff v{__version__} - efficiently diff rows across database tables.\n\n")
8383
self.write("Usage:\n")
84-
self.write(f" * In-db diff: {prog} <database1> <table1> <table2> [OPTIONS]\n")
85-
self.write(f" * Cross-db diff: {prog} <database1> <table1> <database2> <table2> [OPTIONS]\n")
84+
self.write(f" * In-db diff: {prog} <database_a> <table_a> <table_b> [OPTIONS]\n")
85+
self.write(f" * Cross-db diff: {prog} <database_a> <table_a> <database_b> <table_b> [OPTIONS]\n")
8686
self.write(f" * Using config: {prog} --conf PATH [--run NAME] [OPTIONS]\n")
8787

8888

data_diff/databases/_connect.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from data_diff.sqeleton.databases import Connect, Database
21
import logging
32

3+
from data_diff.sqeleton.databases import Connect
4+
45
from .postgresql import PostgreSQL
56
from .mysql import MySQL
67
from .oracle import Oracle

data_diff/diff_tables.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from enum import Enum
88
from contextlib import contextmanager
99
from operator import methodcaller
10-
from typing import Dict, Iterable, Tuple, Iterator, Optional
10+
from typing import Dict, Tuple, Iterator, Optional
1111
from concurrent.futures import ThreadPoolExecutor, as_completed
1212

1313
from runtype import dataclass

data_diff/info_tree.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
from typing import Tuple, List, Dict
2-
from itertools import chain
1+
from typing import List, Dict
32

43
from runtype import dataclass
54

data_diff/sqeleton/abcs/database_types.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -162,23 +162,19 @@ def ROUNDS_ON_PREC_LOSS(self) -> bool:
162162
@abstractmethod
163163
def quote(self, s: str):
164164
"Quote SQL name"
165-
...
166165

167166
@abstractmethod
168167
def concat(self, items: List[str]) -> str:
169168
"Provide SQL for concatenating a bunch of columns into a string"
170-
...
171169

172170
@abstractmethod
173171
def is_distinct_from(self, a: str, b: str) -> str:
174172
"Provide SQL for a comparison where NULL = NULL is true"
175-
...
176173

177174
@abstractmethod
178175
def to_string(self, s: str) -> str:
179176
# TODO rewrite using cast_to(x, str)
180177
"Provide SQL for casting a column to string"
181-
...
182178

183179
@abstractmethod
184180
def random(self) -> str:
@@ -191,17 +187,14 @@ def current_timestamp(self) -> str:
191187
@abstractmethod
192188
def offset_limit(self, offset: Optional[int] = None, limit: Optional[int] = None):
193189
"Provide SQL fragment for limit and offset inside a select"
194-
...
195190

196191
@abstractmethod
197192
def explain_as_text(self, query: str) -> str:
198193
"Provide SQL for explaining a query, returned as table(varchar)"
199-
...
200194

201195
@abstractmethod
202196
def timestamp_value(self, t: datetime) -> str:
203197
"Provide SQL for the given timestamp value"
204-
...
205198

206199
@abstractmethod
207200
def set_timezone_to_utc(self) -> str:
@@ -239,7 +232,6 @@ def CONNECT_URI_PARAMS(self) -> List[str]:
239232
@abstractmethod
240233
def _query(self, sql_code: str) -> list:
241234
"Send query to database and return result"
242-
...
243235

244236
@abstractmethod
245237
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
@@ -249,17 +241,14 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
249241
Note: This method exists instead of select_table_schema(), just because not all databases support
250242
accessing the schema using a SQL query.
251243
"""
252-
...
253244

254245
@abstractmethod
255246
def select_table_unique_columns(self, path: DbPath) -> str:
256247
"Provide SQL for selecting the names of unique columns in the table"
257-
...
258248

259249
@abstractmethod
260250
def query_table_unique_columns(self, path: DbPath) -> List[str]:
261251
"""Query the table for its unique columns for table in 'path', and return {column}"""
262-
...
263252

264253
@abstractmethod
265254
def _process_table_schema(
@@ -277,21 +266,10 @@ def _process_table_schema(
277266
@abstractmethod
278267
def parse_table_name(self, name: str) -> DbPath:
279268
"Parse the given table name into a DbPath"
280-
...
281269

282270
@abstractmethod
283271
def close(self):
284272
"Close connection(s) to the database instance. Querying will stop functioning."
285-
...
286-
287-
@property
288-
@abstractmethod
289-
def is_closed(self) -> bool:
290-
"Return whether or not the connection has been closed"
291-
292-
@abstractmethod
293-
def _normalize_table_path(self, path: DbPath) -> DbPath:
294-
...
295273

296274
@property
297275
@abstractmethod

data_diff/sqeleton/abcs/mixins.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
1414
1515
Precision of dates should be rounded up/down according to coltype.rounds
1616
"""
17-
...
1817

1918
@abstractmethod
2019
def normalize_number(self, value: str, coltype: FractionalType) -> str:
@@ -35,17 +34,8 @@ def normalize_number(self, value: str, coltype: FractionalType) -> str:
3534
it's the same as ``numeric_scale``, and for floats, who use binary precision,
3635
it can be calculated as ``log10(2**numeric_precision)``.
3736
"""
38-
...
3937

40-
@abstractmethod
41-
def normalize_uuid(self, value: str, coltype: ColType_UUID) -> str:
42-
"""Creates an SQL expression, that converts 'value' to a normalized uuid.
43-
44-
i.e. just makes sure there is no trailing whitespace.
45-
"""
46-
...
47-
48-
def normalize_boolean(self, value: str, coltype: Boolean) -> str:
38+
def normalize_boolean(self, value: str, _coltype: Boolean) -> str:
4939
"""Creates an SQL expression, that converts 'value' to either '0' or '1'."""
5040
return self.to_string(value)
5141

@@ -88,7 +78,6 @@ class AbstractMixin_MD5(ABC):
8878
@abstractmethod
8979
def md5_as_int(self, s: str) -> str:
9080
"Provide SQL for computing md5 and returning an int"
91-
...
9281

9382

9483
class AbstractMixin_Schema(ABC):

data_diff/sqeleton/databases/base.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,12 @@ def enable_interactive(self):
320320

321321
def select_table_schema(self, path: DbPath) -> str:
322322
"""Provide SQL for selecting the table schema as (name, type, date_prec, num_prec)"""
323-
schema, table = self._normalize_table_path(path)
323+
schema, name = self._normalize_table_path(path)
324324

325325
return (
326326
"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale "
327327
"FROM information_schema.columns "
328-
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
328+
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
329329
)
330330

331331
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
@@ -338,12 +338,12 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
338338
return d
339339

340340
def select_table_unique_columns(self, path: DbPath) -> str:
341-
schema, table = self._normalize_table_path(path)
341+
schema, name = self._normalize_table_path(path)
342342

343343
return (
344344
"SELECT column_name "
345345
"FROM information_schema.key_column_usage "
346-
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
346+
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
347347
)
348348

349349
def query_table_unique_columns(self, path: DbPath) -> List[str]:
@@ -431,7 +431,7 @@ def _query_cursor(self, c, sql_code: str):
431431
c.execute(sql_code)
432432
if sql_code.lower().startswith(("select", "explain", "show")):
433433
return c.fetchall()
434-
except Exception as e:
434+
except Exception as _e:
435435
# logger.exception(e)
436436
# logger.error(f'Caused by SQL: {sql_code}')
437437
raise
@@ -478,7 +478,6 @@ def _query_in_worker(self, sql_code: Union[str, ThreadLocalInterpreter]):
478478
@abstractmethod
479479
def create_connection(self):
480480
"Return a connection instance, that supports the .cursor() method."
481-
...
482481

483482
def close(self):
484483
super().close()

data_diff/sqeleton/databases/bigquery.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
4949
def normalize_number(self, value: str, coltype: FractionalType) -> str:
5050
return f"format('%.{coltype.precision}f', {value})"
5151

52-
def normalize_boolean(self, value: str, coltype: Boolean) -> str:
52+
def normalize_boolean(self, value: str, _coltype: Boolean) -> str:
5353
return self.to_string(f"cast({value} as int)")
5454

5555

@@ -145,11 +145,12 @@ def close(self):
145145
self._client.close()
146146

147147
def select_table_schema(self, path: DbPath) -> str:
148-
schema, table = self._normalize_table_path(path)
148+
schema, name = self._normalize_table_path(path)
149149

150150
return (
151-
f"SELECT column_name, data_type, 6 as datetime_precision, 38 as numeric_precision, 9 as numeric_scale FROM {schema}.INFORMATION_SCHEMA.COLUMNS "
152-
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
151+
"SELECT column_name, data_type, 6 as datetime_precision, 38 as numeric_precision, 9 as numeric_scale "
152+
f"FROM {schema}.INFORMATION_SCHEMA.COLUMNS "
153+
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
153154
)
154155

155156
def query_table_unique_columns(self, path: DbPath) -> List[str]:

data_diff/sqeleton/databases/clickhouse.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
ThreadedDatabase,
99
import_helper,
1010
ConnectError,
11-
DbTime,
1211
)
1312
from ..abcs.database_types import (
1413
ColType,
@@ -23,7 +22,9 @@
2322
)
2423
from ..abcs.mixins import AbstractMixin_MD5, AbstractMixin_NormalizeValue
2524

26-
DEFAULT_DATABASE = "default" # https://clickhouse.com/docs/en/operations/server-configuration-parameters/settings/#default-database
25+
# https://clickhouse.com/docs/en/operations/server-configuration-parameters/settings/#default-database
26+
DEFAULT_DATABASE = "default"
27+
2728

2829
@import_helper("clickhouse")
2930
def import_clickhouse():

data_diff/sqeleton/databases/connect.py

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from typing import Type, List, Optional, Union, Dict
1+
from typing import Type, Optional, Union, Dict
22
from itertools import zip_longest
3-
import dsnparse
43
from contextlib import suppress
4+
import dsnparse
55

66
from runtype import dataclass
77

@@ -24,24 +24,17 @@
2424
@dataclass
2525
class MatchUriPath:
2626
database_cls: Type[Database]
27-
params: List[str]
28-
kwparams: List[str] = []
29-
help_str: str = "<unspecified>"
30-
31-
def __post_init__(self):
32-
assert self.params == self.database_cls.CONNECT_URI_PARAMS, self.params
33-
assert self.help_str == self.database_cls.CONNECT_URI_HELP, "\n%s\n%s" % (
34-
self.help_str,
35-
self.database_cls.CONNECT_URI_HELP,
36-
)
37-
assert self.kwparams == self.database_cls.CONNECT_URI_KWPARAMS
3827

3928
def match_path(self, dsn):
29+
help_str = self.database_cls.CONNECT_URI_HELP
30+
params = self.database_cls.CONNECT_URI_PARAMS
31+
kwparams = self.database_cls.CONNECT_URI_KWPARAMS
32+
4033
dsn_dict = dict(dsn.query)
4134
matches = {}
42-
for param, arg in zip_longest(self.params, dsn.paths):
35+
for param, arg in zip_longest(params, dsn.paths):
4336
if param is None:
44-
raise ValueError(f"Too many parts to path. Expected format: {self.help_str}")
37+
raise ValueError(f"Too many parts to path. Expected format: {help_str}")
4538

4639
optional = param.endswith("?")
4740
param = param.rstrip("?")
@@ -51,26 +44,26 @@ def match_path(self, dsn):
5144
arg = dsn_dict.pop(param)
5245
except KeyError:
5346
if not optional:
54-
raise ValueError(f"URI must specify '{param}'. Expected format: {self.help_str}")
47+
raise ValueError(f"URI must specify '{param}'. Expected format: {help_str}")
5548

5649
arg = None
5750

5851
assert param and param not in matches
5952
matches[param] = arg
6053

61-
for param in self.kwparams:
54+
for param in kwparams:
6255
try:
6356
arg = dsn_dict.pop(param)
6457
except KeyError:
65-
raise ValueError(f"URI must specify '{param}'. Expected format: {self.help_str}")
58+
raise ValueError(f"URI must specify '{param}'. Expected format: {help_str}")
6659

6760
assert param and arg and param not in matches, (param, arg, matches.keys())
6861
matches[param] = arg
6962

7063
for param, value in dsn_dict.items():
7164
if param in matches:
7265
raise ValueError(
73-
f"Parameter '{param}' already provided as positional argument. Expected format: {self.help_str}"
66+
f"Parameter '{param}' already provided as positional argument. Expected format: {help_str}"
7467
)
7568

7669
matches[param] = value
@@ -99,10 +92,7 @@ class Connect:
9992

10093
def __init__(self, database_by_scheme: Dict[str, Database]):
10194
self.database_by_scheme = database_by_scheme
102-
self.match_uri_path = {
103-
name: MatchUriPath(cls, cls.CONNECT_URI_PARAMS, cls.CONNECT_URI_KWPARAMS, help_str=cls.CONNECT_URI_HELP)
104-
for name, cls in database_by_scheme.items()
105-
}
95+
self.match_uri_path = {name: MatchUriPath(cls) for name, cls in database_by_scheme.items()}
10696
self.conn_cache = WeakCache()
10797

10898
def connect_to_uri(self, db_uri: str, thread_count: Optional[int] = 1) -> Database:

data_diff/sqeleton/databases/databricks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def normalize_number(self, value: str, coltype: NumericType) -> str:
4848
value = f"format_number({value}, {coltype.precision})"
4949
return f"replace({self.to_string(value)}, ',', '')"
5050

51-
def normalize_boolean(self, value: str, coltype: Boolean) -> str:
51+
def normalize_boolean(self, value: str, _coltype: Boolean) -> str:
5252
return self.to_string(f"cast ({value} as int)")
5353

5454

data_diff/sqeleton/databases/duckdb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
5050
def normalize_number(self, value: str, coltype: FractionalType) -> str:
5151
return self.to_string(f"{value}::DECIMAL(38, {coltype.precision})")
5252

53-
def normalize_boolean(self, value: str, coltype: Boolean) -> str:
53+
def normalize_boolean(self, value: str, _coltype: Boolean) -> str:
5454
return self.to_string(f"{value}::INTEGER")
5555

5656

@@ -114,7 +114,7 @@ def set_timezone_to_utc(self) -> str:
114114

115115
class DuckDB(Database):
116116
dialect = Dialect()
117-
SUPPORTS_UNIQUE_CONSTAINT = False # XXX Temporary, until implemented
117+
SUPPORTS_UNIQUE_CONSTAINT = False # Temporary, until we implement it
118118
default_schema = "main"
119119
CONNECT_URI_HELP = "duckdb://<database>@<dbpath>"
120120
CONNECT_URI_PARAMS = ["database", "dbpath"]

data_diff/sqeleton/databases/oracle.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from ..abcs.mixins import AbstractMixin_MD5, AbstractMixin_NormalizeValue, AbstractMixin_Schema
1818
from ..abcs import Compilable
1919
from ..queries import this, table, SKIP
20-
from .base import BaseDialect, ThreadedDatabase, import_helper, ConnectError, QueryError, Mixin_Schema
20+
from .base import BaseDialect, ThreadedDatabase, import_helper, ConnectError, QueryError
2121
from .base import TIMESTAMP_PRECISION_POS
2222

2323
SESSION_TIME_ZONE = None # Changed by the tests
@@ -182,9 +182,9 @@ def _query_cursor(self, c, sql_code: str):
182182
raise QueryError(e)
183183

184184
def select_table_schema(self, path: DbPath) -> str:
185-
schema, table = self._normalize_table_path(path)
185+
schema, name = self._normalize_table_path(path)
186186

187187
return (
188188
f"SELECT column_name, data_type, 6 as datetime_precision, data_precision as numeric_precision, data_scale as numeric_scale"
189-
f" FROM ALL_TAB_COLUMNS WHERE table_name = '{table}' AND owner = '{schema}'"
189+
f" FROM ALL_TAB_COLUMNS WHERE table_name = '{name}' AND owner = '{schema}'"
190190
)

0 commit comments

Comments
 (0)