Skip to content

Commit 92c6274

Browse files
authored
Merge pull request datafold#326 from datafold/dec1
diff_tables() now accepts all JoinDiffer params
2 parents 85f5610 + 78b725d commit 92c6274

File tree

10 files changed

+65
-37
lines changed

10 files changed

+65
-37
lines changed

data_diff/__init__.py

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
from .sqeleton.abcs import DbKey, DbTime, DbPath
66
from .diff_tables import Algorithm
77
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
8-
from .joindiff_tables import JoinDiffer
8+
from .joindiff_tables import JoinDiffer, TABLE_WRITE_LIMIT
99
from .table_segment import TableSegment
10+
from .utils import eval_name_template
1011

1112
__version__ = "0.3.0rc4"
1213

14+
1315
def connect_to_table(
1416
db_info: Union[str, dict],
1517
table_name: Union[DbPath, str],
@@ -55,17 +57,27 @@ def diff_tables(
5557
# Start/end update_column values, used to restrict the segment
5658
min_update: DbTime = None,
5759
max_update: DbTime = None,
58-
# Algorithm
59-
algorithm: Algorithm = Algorithm.HASHDIFF,
60-
# Into how many segments to bisect per iteration (hashdiff only)
61-
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
62-
# When should we stop bisecting and compare locally (in row count; hashdiff only)
63-
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
6460
# Enable/disable threaded diffing. Needed to take advantage of database threads.
6561
threaded: bool = True,
6662
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
6763
# There may be many pools, so number of actual threads can be a lot higher.
6864
max_threadpool_size: Optional[int] = 1,
65+
# Algorithm
66+
algorithm: Algorithm = Algorithm.AUTO,
67+
# Into how many segments to bisect per iteration (hashdiff only)
68+
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
69+
# When should we stop bisecting and compare locally (in row count; hashdiff only)
70+
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
71+
# Enable/disable validating that the key columns are unique. (joindiff only)
72+
validate_unique_key: bool = True,
73+
# Enable/disable sampling of exclusive rows. Creates a temporary table. (joindiff only)
74+
sample_exclusive_rows: bool = False,
75+
# Path of new table to write diff results to. Disabled if not provided. (joindiff only)
76+
materialize_to_table: Union[str, DbPath] = None,
77+
# Materialize every row, not just those that are different. (joindiff only)
78+
materialize_all_rows: bool = False,
79+
# Maximum number of rows to write when materializing, per thread. (joindiff only)
80+
table_write_limit: int = TABLE_WRITE_LIMIT,
6981
) -> Iterator:
7082
"""Finds the diff between table1 and table2.
7183
@@ -78,14 +90,21 @@ def diff_tables(
7890
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
7991
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
8092
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
81-
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
82-
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
83-
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
84-
and compare locally. (Used when algorithm is `HASHDIFF`).
8593
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
8694
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
8795
Only relevant when `threaded` is ``True``.
8896
There may be many pools, so number of actual threads can be a lot higher.
97+
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`. Default=`AUTO`)
98+
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
99+
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
100+
and compare locally. (Used when algorithm is `HASHDIFF`).
101+
validate_unique_key (bool): Enable/disable validating that the key columns are unique. (used for `JOINDIFF`. default: True)
102+
Single query, and can't be threaded, so it's very slow on non-cloud dbs.
103+
Future versions will detect UNIQUE constraints in the schema.
104+
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table. (used for `JOINDIFF`. default: False)
105+
materialize_to_table (Union[str, DbPath], optional): Path of new table to write diff results to. Disabled if not provided. Used for `JOINDIFF`.
106+
materialize_all_rows (bool): Materialize every row, not just those that are different. (used for `JOINDIFF`. default: False)
107+
table_write_limit (int): Maximum number of rows to write when materializing, per thread.
89108
90109
Note:
91110
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
@@ -125,6 +144,9 @@ def diff_tables(
125144
segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables
126145

127146
algorithm = Algorithm(algorithm)
147+
if algorithm == Algorithm.AUTO:
148+
algorithm = Algorithm.JOINDIFF if table1.database is table2.database else Algorithm.HASHDIFF
149+
128150
if algorithm == Algorithm.HASHDIFF:
129151
differ = HashDiffer(
130152
bisection_factor=bisection_factor,
@@ -133,9 +155,16 @@ def diff_tables(
133155
max_threadpool_size=max_threadpool_size,
134156
)
135157
elif algorithm == Algorithm.JOINDIFF:
158+
if isinstance(materialize_to_table, str):
159+
materialize_to_table = table1.database.parse_table_name(eval_name_template(materialize_to_table))
136160
differ = JoinDiffer(
137161
threaded=threaded,
138162
max_threadpool_size=max_threadpool_size,
163+
validate_unique_key=validate_unique_key,
164+
sample_exclusive_rows=sample_exclusive_rows,
165+
materialize_to_table=materialize_to_table,
166+
materialize_all_rows=materialize_all_rows,
167+
table_write_limit=table_write_limit,
139168
)
140169
else:
141170
raise ValueError(f"Unknown algorithm: {algorithm}")

data_diff/__main__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def write_usage(self, prog: str, args: str = "", prefix: Optional[str] = None) -
123123
)
124124
@click.option(
125125
"-m",
126-
"--materialize",
126+
"--materialize-to-table",
127127
default=None,
128128
metavar="TABLE_NAME",
129129
help="(joindiff only) Materialize the diff results into a new table in the database. If a table exists by that name, it will be replaced.",
@@ -248,7 +248,7 @@ def _main(
248248
sample_exclusive_rows,
249249
materialize_all_rows,
250250
table_write_limit,
251-
materialize,
251+
materialize_to_table,
252252
threads1=None,
253253
threads2=None,
254254
__conf__=None,
@@ -340,7 +340,7 @@ def _main(
340340
sample_exclusive_rows=sample_exclusive_rows,
341341
materialize_all_rows=materialize_all_rows,
342342
table_write_limit=table_write_limit,
343-
materialize_to_table=materialize and db1.parse_table_name(eval_name_template(materialize)),
343+
materialize_to_table=materialize_to_table and db1.parse_table_name(eval_name_template(materialize_to_table)),
344344
)
345345
else:
346346
assert algorithm == Algorithm.HASHDIFF

data_diff/joindiff_tables.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,18 @@ class JoinDiffer(TableDiffer):
123123
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
124124
Only relevant when `threaded` is ``True``.
125125
There may be many pools, so number of actual threads can be a lot higher.
126-
validate_unique_key (bool): Enable/disable validating that the key columns are unique.
127-
Single query, and can't be threaded, so it's very slow on non-cloud dbs.
128-
Future versions will detect UNIQUE constraints in the schema.
129-
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table.
126+
validate_unique_key (bool): Enable/disable validating that the key columns are unique. (default: True)
127+
If there are no UNIQUE constraints in the schema, it is done in a single query,
128+
and can't be threaded, so it's very slow on non-cloud dbs.
129+
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. (default: False)
130+
Creates a temporary table.
130131
materialize_to_table (DbPath, optional): Path of new table to write diff results to. Disabled if not provided.
132+
materialize_all_rows (bool): Materialize every row, not just those that are different. (default: False)
131133
table_write_limit (int): Maximum number of rows to write when materializing, per thread.
132134
"""
133135

134136
validate_unique_key: bool = True
135-
sample_exclusive_rows: bool = True
137+
sample_exclusive_rows: bool = False
136138
materialize_to_table: DbPath = None
137139
materialize_all_rows: bool = False
138140
table_write_limit: int = TABLE_WRITE_LIMIT

data_diff/sqeleton/databases/connect.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from .duckdb import DuckDB
2222

2323

24-
2524
@dataclass
2625
class MatchUriPath:
2726
database_cls: Type[Database]

data_diff/sqeleton/queries/ast_classes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,7 @@ class CurrentTimestamp(ExprNode):
786786
def compile(self, c: Compiler) -> str:
787787
return c.dialect.current_timestamp()
788788

789+
789790
# DDL
790791

791792

data_diff/sqeleton/queries/compiler.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
cv_params = contextvars.ContextVar("params")
1313

14+
1415
class Root:
1516
"Nodes inheriting from Root can be used as root statements in SQL (e.g. SELECT yes, RANDOM() no)"
1617

@@ -38,6 +39,7 @@ def compile(self, elem, params=None) -> str:
3839

3940
if self.root and isinstance(elem, Compilable) and not isinstance(elem, Root):
4041
from .ast_classes import Select
42+
4143
elem = Select(columns=[elem])
4244

4345
res = self._compile(elem)

tests/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def get_git_revision_short_hash() -> str:
8787
_database_instances = {}
8888

8989

90-
def get_conn(cls: type, shared: bool =True) -> Database:
90+
def get_conn(cls: type, shared: bool = True) -> Database:
9191
if shared:
9292
if cls not in _database_instances:
9393
_database_instances[cls] = get_conn(cls, shared=False)
@@ -181,6 +181,7 @@ def _test_per_database(cls):
181181

182182
return _test_per_database
183183

184+
184185
def table_segment(database, table_path, key_columns, *args, **kw):
185186
if isinstance(key_columns, str):
186187
key_columns = (key_columns,)

tests/sqeleton/test_sql.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ def test_compile_int(self):
1818
self.assertEqual("1", self.compiler.compile(1))
1919

2020
def test_compile_table_name(self):
21-
self.assertEqual("`marine_mammals`.`walrus`", self.compiler.replace(root=False).compile(table("marine_mammals", "walrus")))
21+
self.assertEqual(
22+
"`marine_mammals`.`walrus`", self.compiler.replace(root=False).compile(table("marine_mammals", "walrus"))
23+
)
2224

2325
def test_compile_select(self):
2426
expected_sql = "SELECT name FROM `marine_mammals`.`walrus`"

tests/test_api.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,14 @@ def test_api_get_stats_dict(self):
6767
"updated": 0,
6868
"unchanged": 4,
6969
"total": 1,
70-
"stats": {"rows_downloaded": 5},
70+
# "stats": {"rows_downloaded": 5},
7171
}
7272
t1 = connect_to_table(TEST_MYSQL_CONN_STRING, self.table_src_name)
7373
t2 = connect_to_table(TEST_MYSQL_CONN_STRING, self.table_dst_name)
7474
diff = diff_tables(t1, t2)
75-
output = diff.get_stats_dict()
7675

76+
output = diff.get_stats_dict()
77+
output.pop('stats')
7778
self.assertEqual(expected_dict, output)
7879
self.assertIsNotNone(diff)
7980
assert len(list(diff)) == 1

tests/test_diff_tables.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
test_each_database: Callable = test_each_database_in_list(TEST_DATABASES)
3232

3333

34-
3534
class TestUtils(unittest.TestCase):
3635
def test_split_space(self):
3736
for i in range(0, 10):
@@ -90,24 +89,16 @@ def test_basic(self):
9089
def test_offset(self):
9190
differ = HashDiffer(bisection_factor=2, bisection_threshold=10)
9291
sec1 = self.now.shift(seconds=-3).datetime
93-
a = table_segment(
94-
self.connection, self.table_src_path, "id", "datetime", max_update=sec1, case_sensitive=False
95-
)
96-
b = table_segment(
97-
self.connection, self.table_dst_path, "id", "datetime", max_update=sec1, case_sensitive=False
98-
)
92+
a = table_segment(self.connection, self.table_src_path, "id", "datetime", max_update=sec1, case_sensitive=False)
93+
b = table_segment(self.connection, self.table_dst_path, "id", "datetime", max_update=sec1, case_sensitive=False)
9994
assert a.count() == 4, a.count()
10095
assert b.count() == 3
10196

10297
assert not list(differ.diff_tables(a, a))
10398
self.assertEqual(len(list(differ.diff_tables(a, b))), 1)
10499

105-
a = table_segment(
106-
self.connection, self.table_src_path, "id", "datetime", min_update=sec1, case_sensitive=False
107-
)
108-
b = table_segment(
109-
self.connection, self.table_dst_path, "id", "datetime", min_update=sec1, case_sensitive=False
110-
)
100+
a = table_segment(self.connection, self.table_src_path, "id", "datetime", min_update=sec1, case_sensitive=False)
101+
b = table_segment(self.connection, self.table_dst_path, "id", "datetime", min_update=sec1, case_sensitive=False)
111102
assert a.count() == 2
112103
assert b.count() == 2
113104
assert not list(differ.diff_tables(a, b))

0 commit comments

Comments
 (0)