Skip to content

Commit

Permalink
Specify supported features for each storage engine, expose them via s…
Browse files Browse the repository at this point in the history
…ystem.table_engines table, properly check feature support in storage factory.
  • Loading branch information
zlobober committed Jan 25, 2020
1 parent c456ee5 commit d6f72dc
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 47 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ bool StorageKafka::streamToViews()

void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
Expand Down Expand Up @@ -636,7 +636,9 @@ void registerStorageKafka(StorageFactory & factory)
return StorageKafka::create(
args.table_id, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
});
};

factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}


Expand Down
37 changes: 22 additions & 15 deletions dbms/src/Storages/MergeTree/registerStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,21 +669,28 @@ static StoragePtr create(const StorageFactory::Arguments & args)

void registerStorageMergeTree(StorageFactory & factory)
{
factory.registerStorage("MergeTree", create);
factory.registerStorage("CollapsingMergeTree", create);
factory.registerStorage("ReplacingMergeTree", create);
factory.registerStorage("AggregatingMergeTree", create);
factory.registerStorage("SummingMergeTree", create);
factory.registerStorage("GraphiteMergeTree", create);
factory.registerStorage("VersionedCollapsingMergeTree", create);

factory.registerStorage("ReplicatedMergeTree", create);
factory.registerStorage("ReplicatedCollapsingMergeTree", create);
factory.registerStorage("ReplicatedReplacingMergeTree", create);
factory.registerStorage("ReplicatedAggregatingMergeTree", create);
factory.registerStorage("ReplicatedSummingMergeTree", create);
factory.registerStorage("ReplicatedGraphiteMergeTree", create);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create);
StorageFactory::StorageFeatures features{
.supports_settings = true,
.supports_skipping_indices = true,
.supports_sort_order = true,
.supports_ttl = true,
};

factory.registerStorage("MergeTree", create, features);
factory.registerStorage("CollapsingMergeTree", create, features);
factory.registerStorage("ReplacingMergeTree", create, features);
factory.registerStorage("AggregatingMergeTree", create, features);
factory.registerStorage("SummingMergeTree", create, features);
factory.registerStorage("GraphiteMergeTree", create, features);
factory.registerStorage("VersionedCollapsingMergeTree", create, features);

factory.registerStorage("ReplicatedMergeTree", create, features);
factory.registerStorage("ReplicatedCollapsingMergeTree", create, features);
factory.registerStorage("ReplicatedReplacingMergeTree", create, features);
factory.registerStorage("ReplicatedAggregatingMergeTree", create, features);
factory.registerStorage("ReplicatedSummingMergeTree", create, features);
factory.registerStorage("ReplicatedGraphiteMergeTree", create, features);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create, features);
}

}
63 changes: 42 additions & 21 deletions dbms/src/Storages/StorageFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ static void checkAllTypesAreAllowedInTable(const NamesAndTypesList & names_and_t
}


void StorageFactory::registerStorage(const std::string & name, Creator creator)
void StorageFactory::registerStorage(const std::string & name, CreatorFn creator_fn, StorageFeatures features)
{
if (!storages.emplace(name, std::move(creator)).second)
if (!storages.emplace(name, Creator{std::move(creator_fn), features}).second)
throw Exception("TableFunctionFactory: the table function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
Expand Down Expand Up @@ -93,24 +93,6 @@ StoragePtr StorageFactory::get(

name = engine_def.name;

if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka" && name != "Join")
{
throw Exception(
"Engine " + name + " doesn't support SETTINGS clause. "
"Currently only the MergeTree family of engines, Kafka engine and Join engine support it",
ErrorCodes::BAD_ARGUMENTS);
}

if ((storage_def->partition_by || storage_def->primary_key || storage_def->order_by || storage_def->sample_by ||
storage_def->ttl_table || !columns.getColumnTTLs().empty() ||
(query.columns_list && query.columns_list->indices && !query.columns_list->indices->children.empty()))
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, PRIMARY KEY, ORDER BY, TTL or SAMPLE BY clauses and skipping indices. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}

if (name == "View")
{
throw Exception(
Expand Down Expand Up @@ -142,6 +124,45 @@ StoragePtr StorageFactory::get(
throw Exception("Unknown table engine " + name, ErrorCodes::UNKNOWN_STORAGE);
}


auto checkFeature = [&](String feature_description, FeatureMatcherFn feature_matcher_fn)
{
if (!feature_matcher_fn(it->second.features)) {
String msg = "Engine " + name + " doesn't support " + feature_description + ". "
"Currently only the following engines have support for the feature: [";
auto supporting_engines = getAllRegisteredNamesByFeatureMatcherFn(feature_matcher_fn);
for (size_t index = 0; index < supporting_engines.size(); ++index)
{
if (index)
msg += ", ";
msg += supporting_engines[index];
}
msg += "]";
throw Exception(msg, ErrorCodes::BAD_ARGUMENTS);
}
};

//if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka" && name != "Join")
if (storage_def->settings)
checkFeature(
"SETTINGS clause",
[](StorageFeatures features) { return features.supports_settings; });

if (storage_def->partition_by || storage_def->primary_key || storage_def->order_by || storage_def->sample_by)
checkFeature(
"PARTITION_BY, PRIMARY_KEY, ORDER_BY or SAMPLE_BY clauses",
[](StorageFeatures features) { return features.supports_sort_order; });

if (storage_def->ttl_table || !columns.getColumnTTLs().empty())
checkFeature(
"TTL clause",
[](StorageFeatures features) { return features.supports_ttl; });

if (query.columns_list && query.columns_list->indices && !query.columns_list->indices->children.empty())
checkFeature(
"skipping indices",
[](StorageFeatures features) { return features.supports_skipping_indices; });

Arguments arguments
{
.engine_name = name,
Expand All @@ -158,7 +179,7 @@ StoragePtr StorageFactory::get(
.has_force_restore_data_flag = has_force_restore_data_flag
};

return it->second(arguments);
return it->second.creator_fn(arguments);
}

StorageFactory & StorageFactory::instance()
Expand Down
38 changes: 34 additions & 4 deletions dbms/src/Storages/StorageFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,22 @@ class StorageFactory : private boost::noncopyable, public IHints<1, StorageFacto
bool has_force_restore_data_flag;
};

using Creator = std::function<StoragePtr(const Arguments & arguments)>;
struct StorageFeatures
{
bool supports_settings = false;
bool supports_skipping_indices = false;
bool supports_sort_order = false;
bool supports_ttl = false;
};

using CreatorFn = std::function<StoragePtr(const Arguments & arguments)>;
struct Creator
{
CreatorFn creator_fn;
StorageFeatures features;
};

using Storages = std::unordered_map<std::string, Creator>;

StoragePtr get(
const ASTCreateQuery & query,
Expand All @@ -59,9 +74,14 @@ class StorageFactory : private boost::noncopyable, public IHints<1, StorageFacto

/// Register a table engine by its name.
/// No locking, you must register all engines before usage of get.
void registerStorage(const std::string & name, Creator creator);
void registerStorage(const std::string & name, CreatorFn creator_fn, StorageFeatures features = StorageFeatures{
.supports_settings = false,
.supports_skipping_indices = false,
.supports_sort_order = false,
.supports_ttl = false,
});

const auto & getAllStorages() const
const Storages & getAllStorages() const
{
return storages;
}
Expand All @@ -74,8 +94,18 @@ class StorageFactory : private boost::noncopyable, public IHints<1, StorageFacto
return result;
}

using FeatureMatcherFn = std::function<bool(StorageFeatures)>;
std::vector<String> getAllRegisteredNamesByFeatureMatcherFn(FeatureMatcherFn feature_matcher_fn) const
{
std::vector<String> result;
for (const auto& pair : storages)
if (feature_matcher_fn(pair.second.features))
result.push_back(pair.first);
return result;
}


private:
using Storages = std::unordered_map<std::string, Creator>;
Storages storages;
};

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/StorageJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }

void registerStorageJoin(StorageFactory & factory)
{
factory.registerStorage("Join", [](const StorageFactory::Arguments & args)
auto creator_fn = [](const StorageFactory::Arguments & args)
{
/// Join(ANY, LEFT, k1, k2, ...)

Expand Down Expand Up @@ -209,7 +209,9 @@ void registerStorageJoin(StorageFactory & factory)
args.constraints,
join_any_take_last_row,
args.context);
});
};

factory.registerStorage("Join", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}

template <typename T>
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/System/StorageSystemTableEngines.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/StorageFactory.h>
#include <Storages/System/StorageSystemTableEngines.h>

Expand All @@ -7,15 +8,22 @@ namespace DB

NamesAndTypesList StorageSystemTableEngines::getNamesAndTypes()
{
return {{"name", std::make_shared<DataTypeString>()}};
return {{"name", std::make_shared<DataTypeString>()},
{"supports_settings", std::make_shared<DataTypeUInt8>()},
{"supports_skipping_indices", std::make_shared<DataTypeUInt8>()},
{"supports_sort_order", std::make_shared<DataTypeUInt8>()},
{"supports_ttl", std::make_shared<DataTypeUInt8>()}};
}

void StorageSystemTableEngines::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & storages = StorageFactory::instance().getAllStorages();
for (const auto & pair : storages)
for (const auto & pair : StorageFactory::instance().getAllStorages())
{
res_columns[0]->insert(pair.first);
res_columns[1]->insert(pair.second.features.supports_settings);
res_columns[2]->insert(pair.second.features.supports_skipping_indices);
res_columns[3]->insert(pair.second.features.supports_sort_order);
res_columns[4]->insert(pair.second.features.supports_ttl);
}
}

Expand Down

0 comments on commit d6f72dc

Please sign in to comment.