Skip to content

Commit

Permalink
Per-connection CatalogCache prototype (cmu-db#952)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbutrovich authored Jun 17, 2020
1 parent a8aa5aa commit c50a22a
Show file tree
Hide file tree
Showing 34 changed files with 307 additions and 125 deletions.
26 changes: 13 additions & 13 deletions benchmark/catalog/catalog_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class CatalogBenchmark : public benchmark::Fixture {

std::pair<catalog::table_oid_t, catalog::index_oid_t> AddUserTableAndIndex() {
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// Create the column definition (no OIDs)
std::vector<catalog::Schema::Column> cols;
Expand All @@ -65,7 +65,7 @@ class CatalogBenchmark : public benchmark::Fixture {
std::pair<catalog::table_oid_t, std::vector<catalog::index_oid_t>> AddUserTableAndIndexes(
const uint16_t num_indexes) {
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// Create the column definition (no OIDs)
std::vector<catalog::Schema::Column> cols;
Expand Down Expand Up @@ -125,7 +125,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetAccessor)(benchmark::State &state) {

// NOLINTNEXTLINE
for (auto _ : state) {
const auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
const auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);
TERRIER_ASSERT(accessor != nullptr, "getting accessor should not fail");
}

Expand Down Expand Up @@ -166,7 +166,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndex)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -184,7 +184,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndexOid)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -202,7 +202,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndexes)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -220,7 +220,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndexSchema)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -235,13 +235,13 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndexSchema)(benchmark::State &state) {
// NOLINTNEXTLINE
BENCHMARK_DEFINE_F(CatalogBenchmark, GetNamespaceOid)(benchmark::State &state) {
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);
const auto ns_oid UNUSED_ATTRIBUTE = accessor->CreateNamespace("test_namespace");
TERRIER_ASSERT(ns_oid != catalog::INVALID_NAMESPACE_OID, "namespace creation should not fail");
txn_manager_->Commit(txn, transaction::TransactionUtil::EmptyCallback, nullptr);

txn = txn_manager_->BeginTransaction();
accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -259,7 +259,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetSchema)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -276,7 +276,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetTable)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -294,7 +294,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetTableOid)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndex();

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand All @@ -313,7 +313,7 @@ BENCHMARK_DEFINE_F(CatalogBenchmark, GetIndexObjects)(benchmark::State &state) {
const auto oids UNUSED_ATTRIBUTE = AddUserTableAndIndexes(num_indexes);

auto *txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_, DISABLED);

// NOLINTNEXTLINE
for (auto _ : state) {
Expand Down
16 changes: 8 additions & 8 deletions benchmark/runner/mini_runners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ class MiniRunners : public benchmark::Fixture {
std::unique_ptr<planner::AbstractPlanNode> JoinNonSelfCorrector(
std::string build_tbl, common::ManagedPointer<transaction::TransactionContext> txn,
std::unique_ptr<planner::AbstractPlanNode> plan) {
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto build_oid = accessor->GetTableOid(std::move(build_tbl));

if (plan->GetPlanNodeType() != planner::PlanNodeType::HASHJOIN) throw "Expected HashJoin";
Expand Down Expand Up @@ -692,7 +692,7 @@ class MiniRunners : public benchmark::Fixture {
auto txn = txn_manager_->BeginTransaction();
auto stmt_list = parser::PostgresParser::BuildParseTree(query);

auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto binder = binder::BindNodeVisitor(common::ManagedPointer(accessor), db_oid);
binder.BindNameToNode(common::ManagedPointer(stmt_list), nullptr, nullptr);

Expand Down Expand Up @@ -724,7 +724,7 @@ class MiniRunners : public benchmark::Fixture {
}

auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);

auto exec_ctx = std::make_unique<execution::exec::ExecutionContext>(db_oid, common::ManagedPointer(txn),
execution::exec::NoOpResultConsumer(),
Expand All @@ -746,7 +746,7 @@ class MiniRunners : public benchmark::Fixture {
void BenchmarkArithmetic(brain::ExecutionOperatingUnitType type, size_t num_elem) {
auto qid = MiniRunners::query_id++;
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto exec_ctx = std::make_unique<execution::exec::ExecutionContext>(db_oid, common::ManagedPointer(txn), nullptr,
nullptr, common::ManagedPointer(accessor));
exec_ctx->SetExecutionMode(static_cast<uint8_t>(mode));
Expand Down Expand Up @@ -952,7 +952,7 @@ BENCHMARK_DEFINE_F(MiniRunners, SEQ0_OutputRunners)(benchmark::State &state) {
auto tuple_size = int_size * num_integers + decimal_size * num_decimals;

auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto schema = std::make_unique<planner::OutputSchema>(std::move(cols));

execution::ExecutableQuery::query_identifier.store(MiniRunners::query_id++);
Expand Down Expand Up @@ -1113,7 +1113,7 @@ void MiniRunners::ExecuteInsert(benchmark::State *state) {
catalog::table_oid_t tbl_oid;
{
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
tbl_oid = accessor->CreateTable(accessor->GetDefaultNamespace(), "tmp_table", tmp_schema);
auto &schema = accessor->GetSchema(tbl_oid);
auto *tmp_table = new storage::SqlTable(db_main->GetStorageLayer()->GetBlockStore(), schema);
Expand Down Expand Up @@ -1164,7 +1164,7 @@ void MiniRunners::ExecuteInsert(benchmark::State *state) {
// Drop the table
{
auto txn = txn_manager_->BeginTransaction();
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog_->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
accessor->DropTable(tbl_oid);
txn_manager_->Commit(txn, transaction::TransactionUtil::EmptyCallback, nullptr);
}
Expand Down Expand Up @@ -1595,7 +1595,7 @@ void InitializeRunnersState() {
db_oid = catalog->CreateDatabase(common::ManagedPointer(txn), "test_db", true);

// Load the database
auto accessor = catalog->GetAccessor(common::ManagedPointer(txn), db_oid);
auto accessor = catalog->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto exec_ctx = std::make_unique<execution::exec::ExecutionContext>(db_oid, common::ManagedPointer(txn), nullptr,
nullptr, common::ManagedPointer(accessor));

Expand Down
2 changes: 1 addition & 1 deletion benchmark/storage/recovery_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ BENCHMARK_DEFINE_F(RecoveryBenchmark, IndexRecovery)(benchmark::State &state) {
// Create database, namespace, and table
auto *txn = txn_manager->BeginTransaction();
auto db_oid = catalog->CreateDatabase(common::ManagedPointer(txn), db_name, true);
auto catalog_accessor = catalog->GetAccessor(common::ManagedPointer(txn), db_oid);
auto catalog_accessor = catalog->GetAccessor(common::ManagedPointer(txn), db_oid, DISABLED);
auto namespace_oid = catalog_accessor->CreateNamespace(namespace_name);

// Create random table
Expand Down
1 change: 1 addition & 0 deletions src/binder/bind_node_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "catalog/catalog_defs.h"
#include "common/exception.h"
#include "common/managed_pointer.h"
#include "execution/functions/function_context.h"
#include "loggers/binder_logger.h"
#include "parser/expression/abstract_expression.h"
#include "parser/expression/aggregate_expression.h"
Expand Down
12 changes: 10 additions & 2 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include "catalog/catalog_accessor.h"
#include "catalog/catalog_cache.h"
#include "catalog/database_catalog.h"
#include "catalog/postgres/builder.h"
#include "catalog/postgres/pg_database.h"
Expand Down Expand Up @@ -203,10 +204,17 @@ common::ManagedPointer<DatabaseCatalog> Catalog::GetDatabaseCatalog(
}

std::unique_ptr<CatalogAccessor> Catalog::GetAccessor(const common::ManagedPointer<transaction::TransactionContext> txn,
db_oid_t database) {
db_oid_t database,
const common::ManagedPointer<CatalogCache> cache) {
auto dbc = this->GetDatabaseCatalog(common::ManagedPointer(txn), database);
if (dbc == nullptr) return nullptr;
return std::make_unique<CatalogAccessor>(common::ManagedPointer(this), dbc, txn);
if (cache != DISABLED) {
const auto last_ddl_change = dbc->write_lock_.load();
const bool invalidate_cache = transaction::TransactionUtil::Committed(last_ddl_change) &&
transaction::TransactionUtil::NewerThan(last_ddl_change, cache->OldestEntry());
if (invalidate_cache) cache->Reset(txn->StartTime());
}
return std::make_unique<CatalogAccessor>(common::ManagedPointer(this), dbc, txn, cache);
}

bool Catalog::CreateDatabaseEntry(const common::ManagedPointer<transaction::TransactionContext> txn, const db_oid_t db,
Expand Down
30 changes: 30 additions & 0 deletions src/catalog/catalog_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <vector>

#include "catalog/catalog.h"
#include "catalog/catalog_cache.h"
#include "catalog/database_catalog.h"
#include "catalog/postgres/pg_proc.h"

namespace terrier::catalog {
Expand Down Expand Up @@ -77,6 +79,15 @@ bool CatalogAccessor::SetTablePointer(table_oid_t table, storage::SqlTable *tabl
}

common::ManagedPointer<storage::SqlTable> CatalogAccessor::GetTable(table_oid_t table) const {
if (cache_ != DISABLED) {
auto table_ptr = cache_->GetTable(table);
if (table_ptr == nullptr) {
// not in the cache, get it from the actual catalog, stash it, and return retrieved value
table_ptr = dbc_->GetTable(txn_, table);
cache_->PutTable(table, table_ptr);
}
return table_ptr;
}
return dbc_->GetTable(txn_, table);
}

Expand All @@ -91,6 +102,16 @@ std::vector<constraint_oid_t> CatalogAccessor::GetConstraints(table_oid_t table)
}

std::vector<index_oid_t> CatalogAccessor::GetIndexOids(table_oid_t table) const {
if (cache_ != DISABLED) {
auto cache_lookup = cache_->GetIndexOids(table);
if (!cache_lookup.first) {
// not in the cache, get it from the actual catalog, stash it, and return retrieved value
const auto index_oids = dbc_->GetIndexOids(txn_, table);
cache_->PutIndexOids(table, index_oids);
return index_oids;
}
return cache_lookup.second;
}
return dbc_->GetIndexOids(txn_, table);
}

Expand Down Expand Up @@ -130,6 +151,15 @@ bool CatalogAccessor::SetIndexPointer(index_oid_t index, storage::index::Index *
}

common::ManagedPointer<storage::index::Index> CatalogAccessor::GetIndex(index_oid_t index) const {
if (cache_ != DISABLED) {
auto index_ptr = cache_->GetIndex(index);
if (index_ptr == nullptr) {
// not in the cache, get it from the actual catalog, stash it, and return retrieved value
index_ptr = dbc_->GetIndex(txn_, index);
cache_->PutIndex(index, index_ptr);
}
return index_ptr;
}
return dbc_->GetIndex(txn_, index);
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/compiler/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "date/date.h"
#include "execution/sql/value.h"
#include "parser/expression/constant_value_expression.h"
#include "storage/index/index.h"
#include "storage/index/index_defs.h"
#include "util/time_util.h"

namespace terrier::execution::compiler {
Expand Down
1 change: 1 addition & 0 deletions src/execution/compiler/operator/delete_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "catalog/catalog_accessor.h"
#include "execution/compiler/function_builder.h"
#include "execution/compiler/translator_factory.h"
#include "storage/index/index.h"

namespace terrier::execution::compiler {
DeleteTranslator::DeleteTranslator(const terrier::planner::DeletePlanNode *op, CodeGen *codegen)
Expand Down
1 change: 1 addition & 0 deletions src/execution/compiler/operator/index_join_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "execution/compiler/operator/operator_translator.h"
#include "execution/compiler/translator_factory.h"
#include "planner/plannodes/index_join_plan_node.h"
#include "storage/index/index.h"

namespace terrier::execution::compiler {

Expand Down
1 change: 1 addition & 0 deletions src/execution/compiler/operator/index_scan_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "execution/compiler/operator/operator_translator.h"
#include "execution/compiler/translator_factory.h"
#include "planner/plannodes/index_join_plan_node.h"
#include "storage/index/index.h"

namespace terrier::execution::compiler {

Expand Down
1 change: 1 addition & 0 deletions src/execution/compiler/operator/insert_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "catalog/catalog_accessor.h"
#include "execution/compiler/function_builder.h"
#include "execution/compiler/translator_factory.h"
#include "storage/index/index.h"

namespace terrier::execution::compiler {
InsertTranslator::InsertTranslator(const terrier::planner::InsertPlanNode *op, CodeGen *codegen)
Expand Down
1 change: 1 addition & 0 deletions src/execution/compiler/operator/update_translator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "catalog/catalog_accessor.h"
#include "execution/compiler/function_builder.h"
#include "execution/compiler/translator_factory.h"
#include "storage/index/index.h"

namespace terrier::execution::compiler {
UpdateTranslator::UpdateTranslator(const terrier::planner::UpdatePlanNode *op, CodeGen *codegen)
Expand Down
1 change: 1 addition & 0 deletions src/execution/sql/storage_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "catalog/catalog_accessor.h"
#include "execution/exec/execution_context.h"
#include "execution/util/execution_common.h"
#include "storage/index/index.h"

namespace terrier::execution::sql {

Expand Down
5 changes: 4 additions & 1 deletion src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class GarbageCollector;
}

namespace terrier::catalog {

class CatalogCache;
class DatabaseCatalog;
class CatalogAccessor;

Expand Down Expand Up @@ -119,10 +121,11 @@ class Catalog {
* Creates a new accessor into the catalog which will handle transactionality and sequencing of catalog operations.
* @param txn for all subsequent catalog queries
* @param database in which this transaction is scoped
* @param cache CatalogCache object for this connection, or nullptr if disabled
* @return a CatalogAccessor object for use with this transaction
*/
std::unique_ptr<CatalogAccessor> GetAccessor(common::ManagedPointer<transaction::TransactionContext> txn,
db_oid_t database);
db_oid_t database, common::ManagedPointer<CatalogCache> cache);

/**
* @return Catalog's BlockStore
Expand Down
Loading

0 comments on commit c50a22a

Please sign in to comment.