Skip to content

Commit

Permalink
Merge pull request ClickHouse#24567 from ClickHouse/backport/21.6/24157
Browse files Browse the repository at this point in the history
Backport ClickHouse#24157 to 21.6: Adjust modulo function when used in partition key
  • Loading branch information
kssenii authored May 31, 2021
2 parents d54c0da + c616cb4 commit 25eb234
Show file tree
Hide file tree
Showing 29 changed files with 395 additions and 40 deletions.
6 changes: 6 additions & 0 deletions src/DataTypes/NumberTraits.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ template <typename A, typename B> struct ResultOfModulo
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};

template <typename A, typename B> struct ResultOfModuloLegacy
{
using Type0 = typename Construct<is_signed_v<A> || is_signed_v<B>, false, sizeof(B)>::Type;
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};

template <typename A> struct ResultOfNegate
{
using Type = typename Construct<
Expand Down
6 changes: 6 additions & 0 deletions src/Functions/DivisionUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,10 @@ struct ModuloImpl
#endif
};

template <typename A, typename B>
struct ModuloLegacyImpl : ModuloImpl<A, B>
{
using ResultType = typename NumberTraits::ResultOfModuloLegacy<A, B>::Type;
};

}
13 changes: 13 additions & 0 deletions src/Functions/modulo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ struct ModuloByConstantImpl
}
};

template <typename A, typename B>
struct ModuloLegacyByConstantImpl : ModuloByConstantImpl<A, B>
{
using Op = ModuloLegacyImpl<A, B>;
};
}

/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
Expand Down Expand Up @@ -134,4 +139,12 @@ void registerFunctionModulo(FunctionFactory & factory)
factory.registerAlias("mod", "modulo", FunctionFactory::CaseInsensitive);
}

struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; };
using FunctionModuloLegacy = BinaryArithmeticOverloadResolver<ModuloLegacyImpl, NameModuloLegacy, false>;

void registerFunctionModuloLegacy(FunctionFactory & factory)
{
factory.registerFunction<FunctionModuloLegacy>();
}

}
2 changes: 2 additions & 0 deletions src/Functions/registerFunctionsArithmetic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ void registerFunctionIntDiv(FunctionFactory & factory);
void registerFunctionIntDivOrZero(FunctionFactory & factory);
void registerFunctionModulo(FunctionFactory & factory);
void registerFunctionModuloOrZero(FunctionFactory & factory);
void registerFunctionModuloLegacy(FunctionFactory & factory);
void registerFunctionNegate(FunctionFactory & factory);
void registerFunctionAbs(FunctionFactory & factory);
void registerFunctionBitAnd(FunctionFactory & factory);
Expand Down Expand Up @@ -51,6 +52,7 @@ void registerFunctionsArithmetic(FunctionFactory & factory)
registerFunctionIntDivOrZero(factory);
registerFunctionModulo(factory);
registerFunctionModuloOrZero(factory);
registerFunctionModuloLegacy(factory);
registerFunctionNegate(factory);
registerFunctionAbs(factory);
registerFunctionBitAnd(factory);
Expand Down
25 changes: 25 additions & 0 deletions src/Storages/KeyDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Functions/IFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
Expand Down Expand Up @@ -86,6 +87,30 @@ KeyDescription KeyDescription::getKeyFromAST(
return getSortingKeyFromAST(definition_ast, columns, context, {});
}

bool KeyDescription::moduloToModuloLegacyRecursive(ASTPtr node_expr)
{
if (!node_expr)
return false;

auto * function_expr = node_expr->as<ASTFunction>();
bool modulo_in_ast = false;
if (function_expr)
{
if (function_expr->name == "modulo")
{
function_expr->name = "moduloLegacy";
modulo_in_ast = true;
}
if (function_expr->arguments)
{
auto children = function_expr->arguments->children;
for (const auto & child : children)
modulo_in_ast |= moduloToModuloLegacyRecursive(child);
}
}
return modulo_in_ast;
}

KeyDescription KeyDescription::getSortingKeyFromAST(
const ASTPtr & definition_ast,
const ColumnsDescription & columns,
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/KeyDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct KeyDescription
/// unintentionaly share AST variables and modify them.
KeyDescription(const KeyDescription & other);
KeyDescription & operator=(const KeyDescription & other);

/// Substitute modulo with moduloLegacy. Used in KeyCondition to allow proper comparison with keys.
static bool moduloToModuloLegacyRecursive(ASTPtr node_expr);
};

}
8 changes: 5 additions & 3 deletions src/Storages/MergeTree/DataPartsExchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)

MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
Expand Down Expand Up @@ -493,7 +494,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (e.code() != ErrorCodes::S3_ERROR)
throw;
/// Try again but without S3 copy
return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts,
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
}
}
Expand Down Expand Up @@ -557,14 +558,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(

MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections)
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
}

MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections)
Expand Down Expand Up @@ -619,7 +621,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0);
new_data_part->partition.create(metadata_snapshot, block, 0, context);

MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/DataPartsExchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Fetcher final : private boost::noncopyable
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
Expand Down Expand Up @@ -106,6 +107,7 @@ class Fetcher final : private boost::noncopyable
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections);
Expand Down
41 changes: 35 additions & 6 deletions src/Storages/MergeTree/KeyCondition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/KeyDescription.h>

#include <cassert>
#include <stack>
Expand Down Expand Up @@ -591,6 +592,30 @@ void KeyCondition::traverseAST(const ASTPtr & node, ContextPtr context, Block &
rpn.emplace_back(std::move(element));
}

bool KeyCondition::canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name)
{
const auto & sample_block = key_expr->getSampleBlock();

/// sample_block from key_expr cannot contain modulo and moduloLegacy at the same time.
/// For partition key it is always moduloLegacy.
if (sample_block.has(expr_name))
{
result_expr_name = expr_name;
}
else
{
auto adjusted_ast = node->clone();
KeyDescription::moduloToModuloLegacyRecursive(adjusted_ast);
String adjusted_expr_name = adjusted_ast->getColumnName();

if (!sample_block.has(adjusted_expr_name))
return false;

result_expr_name = adjusted_expr_name;
}

return true;
}

bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
Expand All @@ -600,11 +625,13 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = node->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = node->getColumnName();
String expr_name;
if (!canConstantBeWrapped(node, passed_expr_name, expr_name))
return false;

const auto & sample_block = key_expr->getSampleBlock();

/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
Expand Down Expand Up @@ -668,11 +695,13 @@ bool KeyCondition::canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = ast->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = ast->getColumnName();
String expr_name;
if (!canConstantBeWrapped(ast, passed_expr_name, expr_name))
return false;

const auto & sample_block = key_expr->getSampleBlock();

/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/MergeTree/KeyCondition.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ class KeyCondition
bool canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);

/// Check if ASTPtr node, passed to canConstantBeWrappedBy*, can be used by them for further checks.
/// Always call this method at start of other methods, which require key comparison, because it also checks if adjusted
/// key expression can also be used (with substitution from modulo to moduloLegacy). This is needed because partition key
/// is always modified, when passed into keyCondition, - with recursive substitution from modulo to moduloLegacy.
bool canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name);

/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix()

void MergeTreeBlockOutputStream::write(const Block & block)
{
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{
Stopwatch watch;
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);

PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */);
PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};

Expand Down Expand Up @@ -876,13 +876,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Create and correctly initialize global WAL object
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot))
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot))
for (auto && part : wal.restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(

minmax_idx_condition.emplace(
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */);
partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */);

if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{
Expand Down
11 changes: 6 additions & 5 deletions src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ void updateTTL(

}

BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
Expand All @@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
}

Block block_copy = block;
const auto & partition_key = metadata_snapshot->getPartitionKey();
partition_key.expression->execute(block_copy);
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
auto partition_key_names_and_types = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);

ColumnRawPtrs partition_columns;
partition_columns.reserve(partition_key.sample_block.columns());
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
partition_columns.reserve(partition_key_names_and_types.size());
for (const auto & element : partition_key_names_and_types)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());

PODArray<size_t> partition_num_to_first_row;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeDataWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MergeTreeDataWriter
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);

/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
Expand Down
Loading

0 comments on commit 25eb234

Please sign in to comment.