Skip to content

Commit

Permalink
Merge pull request ClickHouse#16555 from ClickHouse/yet_another_fix_f…
Browse files Browse the repository at this point in the history
…or_00993

Better logging in replication queue, fix flacky test
  • Loading branch information
tavplubix authored Oct 31, 2020
2 parents 15a342e + ce6c44e commit db146ee
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 35 deletions.
84 changes: 50 additions & 34 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,13 +965,16 @@ void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePa
}


bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name,
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
{
/// Let's check if the same part is now being created by another action.
if (future_parts.count(new_part_name))
{
out_reason = "Not executing log entry for part " + new_part_name
+ " because another log entry for the same part is being processed. This shouldn't happen often.";
const char * format_str = "Not executing log entry {} for part {} "
"because another log entry for the same part is being processed. This shouldn't happen often.";
LOG_INFO(log, format_str, log_entry_name, new_part_name);
out_reason = fmt::format(format_str, log_entry_name, new_part_name);
return false;

/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
Expand All @@ -992,8 +995,10 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_

if (future_part.contains(result_part))
{
out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part "
+ future_part_elem.first + " that is currently executing";
const char * format_str = "Not executing log entry {} for part {} "
"because it is covered by part {} that is currently executing.";
LOG_TRACE(log, format_str, log_entry_name, new_part_name, future_part_elem.first);
out_reason = fmt::format(format_str, log_entry_name, new_part_name, future_part_elem.first);
return false;
}
}
Expand All @@ -1005,7 +1010,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{
std::lock_guard lock(state_mutex);

if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
return true;
Expand All @@ -1030,12 +1035,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
for (const String & new_part_name : entry.getBlockingPartNames())
{
if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock))
{
if (!out_postpone_reason.empty())
LOG_DEBUG(log, out_postpone_reason);
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
}
}
}

Expand All @@ -1051,10 +1052,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (future_parts.count(name))
{
String reason = "Not merging into part " + entry.new_part_name
+ " because part " + name + " is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
/// Copy-paste of above because we need structured logging (instead of already formatted message).
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}

Expand All @@ -1070,9 +1072,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(

if (merger_mutator.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}

Expand All @@ -1094,17 +1096,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges with TTL are cancelled now.";
LOG_DEBUG(log, reason);
out_postpone_reason = reason;
const char * format_str = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
LOG_DEBUG(log, format_str,
entry.znode_name, entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.new_part_name);
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
const char * format_str = "Not executing log entry for part {}"
const char * format_str = "Not executing log entry {} for part {}"
" because {} merges with TTL already executing, maximum {}.";
LOG_DEBUG(log, format_str, entry.new_part_name, total_merges_with_ttl,
LOG_DEBUG(log, format_str, entry.znode_name,
entry.new_part_name, total_merges_with_ttl,
data_settings->max_number_of_merges_with_ttl_in_pool);

out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl,
Expand All @@ -1116,15 +1120,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(

if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
const char * format_str = "Not executing log entry {} for part {}"
const char * format_str = "Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({}).";

LOG_DEBUG(log, format_str,
LOG_DEBUG(log, format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));

/// Copy-paste of above because we need structured logging (instead of already formatted message).
out_postpone_reason = fmt::format(format_str,
out_postpone_reason = fmt::format(format_str, entry.znode_name,
entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));

Expand All @@ -1139,9 +1142,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter)
+ " must be executed before";
const char * format_str = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
return false;
}
}
Expand All @@ -1153,11 +1156,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
if (head_alter == entry.alter_version)
out_postpone_reason = "Cannot execute alter data with version: "
+ std::to_string(entry.alter_version) + " because metadata still not altered";
{
const char * format_str = "Cannot execute alter data {} with version {} because metadata still not altered";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version);
}
else
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because another alter " + std::to_string(head_alter) + " must be executed before";
{
const char * format_str = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
}

return false;
}
Expand All @@ -1170,7 +1179,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (currently_executing_drop_or_replace_range)
{

const char * format_str = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
}

return true;
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class ReplicatedMergeTreeQueue
* Should be called under state_mutex.
*/
bool isNotCoveredByFuturePartsImpl(
const String & log_entry_name,
const String & new_part_name, String & out_reason,
std::lock_guard<std::mutex> & state_lock) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from Zo

wait

for i in {0..9}; do $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i"; done
for i in {0..9}; do
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i" &
done
wait

0 comments on commit db146ee

Please sign in to comment.