Skip to content

Commit

Permalink
test(cluster): Fix and uncomment counter sanity checks (dragonflydb#2591
Browse files Browse the repository at this point in the history
)
chakaz authored Feb 14, 2024
1 parent ca23d72 commit b18fe8c
Showing 2 changed files with 24 additions and 17 deletions.
25 changes: 13 additions & 12 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
@@ -993,8 +993,6 @@ async def test_cluster_fuzzymigration(df_local_factory: DflyInstanceFactory, df_
for instance in instances
]

cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)

async def generate_config():
return [
{
@@ -1087,7 +1085,13 @@ async def list_counter(key, client: aioredis.RedisCluster):

# Start ten counters
counter_keys = [f"_counter{i}" for i in range(10)]
counters = [asyncio.create_task(list_counter(key, cluster_client)) for key in counter_keys]
counter_connections = [
aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port) for _ in range(10)
]
counters = [
asyncio.create_task(list_counter(key, conn))
for key, conn in zip(counter_keys, counter_connections)
]

# Finalize slot migration
for node in nodes:
@@ -1112,19 +1116,16 @@ async def list_counter(key, client: aioredis.RedisCluster):
node.new_slots = []

# Check counter consistency
# TODO: This fails and exposes a REAL BUG!
# for key in counter_keys:
# counter_list = await cluster_client.lrange(key, 0, -1)
# for i, j in zip(counter_list, counter_list[1:]):
# print(f"comparing {i}, {j}")
# assert int(i) == int(j) + 1, f"huh? {counter_list}"
cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)
for key in counter_keys:
counter_list = await cluster_client.lrange(key, 0, -1)
for i, j in zip(counter_list, counter_list[1:]):
assert int(i) == int(j) + 1, f"Found inconsistent list in {key}: {counter_list}"

# Compare capture
assert await seeder.compare(capture, nodes[0].instance.port)

await disconnect_clients(
*[node.admin_client for node in nodes], *[node.client for node in nodes]
)
await asyncio.gather(*[c.close() for c in counter_connections])
await close_clients(
cluster_client, *[node.admin_client for node in nodes], *[node.client for node in nodes]
)
16 changes: 11 additions & 5 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
@@ -451,12 +451,17 @@ def _make_client(self, **kwargs):
else:
return aioredis.Redis(**kwargs)

async def _close_client(self, client):
if not self.cluster_mode:
await client.connection_pool.disconnect()
await client.close()

async def _capture_db(self, port, target_db, keys):
client = self._make_client(port=port, db=target_db)
capture = DataCapture(await self._capture_entries(client, keys))

if hasattr(client, "connection_pool"):
await client.connection_pool.disconnect()
await self._close_client(client)

return capture

async def _generator_task(self, queues, target_ops=None, target_deviation=None):
@@ -530,13 +535,14 @@ async def _executor_task(self, db, queue):
await pipe.execute()
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
if self.stop_on_failure:
await self._close_client(client)
raise SystemExit(e)
except Exception as e:
await self._close_client(client)
raise SystemExit(e)
queue.task_done()
await client.close()
if not self.cluster_mode:
await client.connection_pool.disconnect()

await self._close_client(client)

CAPTURE_COMMANDS = {
ValueType.STRING: lambda pipe, k: pipe.get(k),

0 comments on commit b18fe8c

Please sign in to comment.