Skip to content

Commit

Permalink
Make defragment_sharded_collection.py work for small number of chunks…
Browse files Browse the repository at this point in the history
… as well
  • Loading branch information
kaloianm committed Jun 27, 2021
1 parent 329aec9 commit 1ee9bc8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
59 changes: 47 additions & 12 deletions ctools/defragment_sharded_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ async def main(args):
with tqdm(total=num_chunks, unit=' chunks') as progress:
async for c in cluster.configDb.chunks.find({'ns': coll.name}, sort=[('min',
pymongo.ASCENDING)]):
shardId = c['shard']
shard_id = c['shard']
if collectionVersion is None:
collectionVersion = c['lastmod']
if c['lastmod'] > collectionVersion:
collectionVersion = c['lastmod']
if shardId not in shard_to_chunks:
shard_to_chunks[shardId] = {'chunks': [], 'num_merges_performed': 0}
shard = shard_to_chunks[shardId]
if shard_id not in shard_to_chunks:
shard_to_chunks[shard_id] = {'chunks': [], 'num_merges_performed': 0}
shard = shard_to_chunks[shard_id]
shard['chunks'].append(c)
progress.update()

Expand Down Expand Up @@ -220,12 +220,35 @@ async def merge_chunks_on_shard(shard, collection_version, progress):

num_lock_busy_errors_encountered = 0

for c in shard_chunks:
def lookahead(iterable):
"""Pass through all values from the given iterable, augmented by the
information if there are more values to come after the current one
(True), or if it is the last value (False).
"""
# Get an iterator and pull the first value.
it = iter(iterable)
last = next(it)
# Run the iterator to exhaustion (starting from the second value).
for val in it:
# Report the *previous* value (more to come).
yield last, True
last = val
# Report the last value.
yield last, False

for c, has_more in lookahead(shard_chunks):
progress.update()

if len(consecutive_chunks) == 0:
consecutive_chunks = [c]
estimated_size_of_consecutive_chunks = args.phase_1_estimated_chunk_size_kb

if not args.dryrun and not has_more and not 'defrag_collection_est_size' in consecutive_chunks[
-1]:
chunk_range = [consecutive_chunks[-1]['min'], consecutive_chunks[-1]['max']]
data_size = await coll.data_size_kb_from_shard(chunk_range)
await coll.try_write_chunk_size(chunk_range, shard, data_size)

continue

merge_consecutive_chunks_without_size_check = False
Expand All @@ -241,29 +264,38 @@ async def merge_chunks_on_shard(shard, collection_version, progress):

consecutive_chunks = [c]
estimated_size_of_consecutive_chunks = args.phase_1_estimated_chunk_size_kb

if not args.dryrun and not has_more and not 'defrag_collection_est_size' in consecutive_chunks[
-1]:
chunk_range = [consecutive_chunks[-1]['min'], consecutive_chunks[-1]['max']]
data_size = await coll.data_size_kb_from_shard(chunk_range)
await coll.try_write_chunk_size(chunk_range, shard, data_size)

continue
else:
merge_consecutive_chunks_without_size_check = True

# To proceed to this stage we must have at least 2 consecutive chunks as candidates to
# be merged
assert (len(consecutive_chunks) > 1)

# After we have collected a run of chunks whose estimated size is 90% of the maximum
# chunk size, invoke `dataSize` in order to determine whether we can merge them or if
# we should continue adding more chunks to be merged
if (estimated_size_of_consecutive_chunks <= target_chunk_size_kb * 0.90
) and not merge_consecutive_chunks_without_size_check:
if (estimated_size_of_consecutive_chunks < target_chunk_size_kb * 0.90
) and not merge_consecutive_chunks_without_size_check and has_more:
continue

merge_bounds = [consecutive_chunks[0]['min'], consecutive_chunks[-1]['max']]

# Determine the "exact" (not 100% exact because we use the 'estimate' option) size of
# the currently accumulated bounds via the `dataSize` command in order to decide
# whether this run should be merged or if we should continue adding chunks to it.
actual_size_of_consecutive_chunks = estimated_size_of_consecutive_chunks
if not args.dryrun:
actual_size_of_consecutive_chunks = await coll.data_size_kb_from_shard(
merge_bounds) if not args.dryrun else estimated_size_of_consecutive_chunks
else:
actual_size_of_consecutive_chunks = estimated_size_of_consecutive_chunks
actual_size_of_consecutive_chunks = await coll.data_size_kb_from_shard(merge_bounds)

if merge_consecutive_chunks_without_size_check:
if merge_consecutive_chunks_without_size_check or not has_more:
pass
elif actual_size_of_consecutive_chunks < target_chunk_size_kb * 0.75:
# If the actual range size is sill 25% less than the target size, continue adding
Expand Down Expand Up @@ -298,6 +330,9 @@ async def merge_chunks_on_shard(shard, collection_version, progress):
f'Merging {len(consecutive_chunks)} consecutive chunks on {shard}: {merge_bounds}'
)

# Reset the accumulator so far. If we are merging due to
# merge_consecutive_chunks_without_size_check, need to make sure that we don't forget
# the current entry since it is not part of the run
if merge_consecutive_chunks_without_size_check:
consecutive_chunks = [c]
estimated_size_of_consecutive_chunks = args.phase_1_estimated_chunk_size_kb
Expand Down
1 change: 0 additions & 1 deletion ctools/generate_fragmented_sharded_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ async def main(args):
uuid_shard_key_byte_order = None
if args.shard_key_type == 'uuid':
uuid_shard_key_byte_order = 'little' if cluster.uuid_representation == UuidRepresentation.JAVA_LEGACY else 'big'
if uuid_shard_key_byte_order:
print(f'Will use {uuid_shard_key_byte_order} byte order for generating UUIDs')

print(f'Cleaning up old entries for {args.ns} ...')
Expand Down

0 comments on commit 1ee9bc8

Please sign in to comment.