Skip to content

Commit

Permalink
Merge branch 'inc' into frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
jarulraj committed Jun 30, 2016
2 parents 5e6c18f + a3d80b9 commit b6f1239
Show file tree
Hide file tree
Showing 41 changed files with 675 additions and 4,126 deletions.
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ if (COVERALLS)
endif()

# -- [ Doxygen
include(doxygen)

doxygen_configure(src/ test/)
if (DOXYGEN)
include(doxygen)
doxygen_configure(src/ test/)
endif()

# ---[ Linter target
add_custom_target(lint COMMAND ${CMAKE_COMMAND} -P ${PROJECT_SOURCE_DIR}/cmake/lint.cmake)

# ---[ Configuration summary
peloton_print_configuration_summary()

Expand Down
7 changes: 1 addition & 6 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ set_target_properties(peloton-bin PROPERTIES OUTPUT_NAME peloton)

# --[ Benchmarks

# --[ hyadapt
file(GLOB_RECURSE hyadapt_srcs ${PROJECT_SOURCE_DIR}/src/main/hyadapt/*.cpp)
add_executable(hyadapt EXCLUDE_FROM_ALL ${hyadapt_srcs})
target_link_libraries(hyadapt peloton)

# --[ ycsb
file(GLOB_RECURSE ycsb_srcs ${PROJECT_SOURCE_DIR}/src/main/ycsb/*.cpp)
add_executable(ycsb EXCLUDE_FROM_ALL ${ycsb_srcs})
Expand Down Expand Up @@ -73,6 +68,6 @@ target_link_libraries(logger peloton)

add_custom_target(benchmark)

add_dependencies(benchmark hyadapt ycsb tpcc sdbench logger)
add_dependencies(benchmark ycsb tpcc sdbench logger)


4 changes: 2 additions & 2 deletions src/container/skip_list_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ namespace peloton {
SKIP_LIST_MAP_TEMPLATE_ARGUMENTS
SKIP_LIST_MAP_TYPE::SkipListMap(){

LOG_INFO("Creating Skip List Map");
LOG_TRACE("Creating Skip List Map");

}

SKIP_LIST_MAP_TEMPLATE_ARGUMENTS
SKIP_LIST_MAP_TYPE::~SkipListMap(){

LOG_INFO("Destroying Skip List Map");
LOG_TRACE("Destroying Skip List Map");

}

Expand Down
131 changes: 87 additions & 44 deletions src/executor/hybrid_scan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ namespace executor {

HybridScanExecutor::HybridScanExecutor(const planner::AbstractPlan *node,
ExecutorContext *executor_context)
: AbstractScanExecutor(node, executor_context),
indexed_tile_offset_(START_OID) {}
: AbstractScanExecutor(node, executor_context),
indexed_tile_offset_(START_OID) {}

bool HybridScanExecutor::DInit() {
auto status = AbstractScanExecutor::DInit();
Expand All @@ -55,27 +55,36 @@ bool HybridScanExecutor::DInit() {
table_ = node.GetTable();
index_ = node.GetDataIndex();
type_ = node.GetHybridType();
PL_ASSERT(table_ != nullptr);

if (type_ == planner::SEQ) {
// SEQUENTIAL SCAN
if (type_ == HYBRID_SCAN_TYPE_SEQUENTIAL) {
LOG_TRACE("Sequential Scan");
current_tile_group_offset_ = START_OID;
if (table_ != nullptr) {
table_tile_group_count_ = table_->GetTileGroupCount();
if (column_ids_.empty()) {
column_ids_.resize(table_->GetSchema()->GetColumnCount());
std::iota(column_ids_.begin(), column_ids_.end(), 0);
}

table_tile_group_count_ = table_->GetTileGroupCount();
if (column_ids_.empty()) {
column_ids_.resize(table_->GetSchema()->GetColumnCount());
std::iota(column_ids_.begin(), column_ids_.end(), 0);
}
}
// INDEX SCAN
else if (type_ == HYBRID_SCAN_TYPE_INDEX) {
LOG_TRACE("Index Scan");
index_ = node.GetIndex();

} else if (type_ == planner::INDEX) {
result_itr_ = START_OID;
index_done_ = false;
result_.clear();

PL_ASSERT(index_ != nullptr);
column_ids_ = node.GetColumnIds();
auto key_column_ids_ = node.GetKeyColumnIds();
auto expr_types_ = node.GetExprTypes();
values_ = node.GetValues();
auto runtime_keys_ = node.GetRunTimeKeys();
predicate_ = node.GetPredicate();
key_ready_ = false;

if (runtime_keys_.size() != 0) {
assert(runtime_keys_.size() == values_.size());
Expand All @@ -94,13 +103,19 @@ bool HybridScanExecutor::DInit() {
}

if (table_ != nullptr) {
LOG_TRACE("Column count : %u", table_->GetSchema()->GetColumnCount());
full_column_ids_.resize(table_->GetSchema()->GetColumnCount());
std::iota(full_column_ids_.begin(), full_column_ids_.end(), 0);
}
} else { // Hybrid type.
}
// HYBRID SCAN
else if (type_ == HYBRID_SCAN_TYPE_HYBRID) {
LOG_TRACE("Hybrid Scan");

table_tile_group_count_ = table_->GetTileGroupCount();
int offset = index_->GetIndexedTileGroupOff();
indexed_tile_offset_ = (offset == -1) ? INVALID_OID : (oid_t)offset;
block_threshold = 0;

if (indexed_tile_offset_ == INVALID_OID) {
current_tile_group_offset_ = START_OID;
Expand Down Expand Up @@ -149,6 +164,10 @@ bool HybridScanExecutor::DInit() {
std::iota(full_column_ids_.begin(), full_column_ids_.end(), 0);
}
}
// FALLBACK
else {
throw Exception("Invalid hybrid scan type : " + std::to_string(type_));
}

return true;
}
Expand All @@ -165,6 +184,7 @@ bool HybridScanExecutor::SeqScanUtil() {

// Retrieve next tile group.
while (current_tile_group_offset_ < table_tile_group_count_) {
LOG_TRACE("Current tile group offset : %u", current_tile_group_offset_);
auto tile_group = table_->GetTileGroup(current_tile_group_offset_++);
auto tile_group_header = tile_group->GetHeader();

Expand All @@ -181,28 +201,29 @@ bool HybridScanExecutor::SeqScanUtil() {
std::vector<oid_t> position_list;
for (oid_t tuple_id = 0; tuple_id < active_tuple_count; tuple_id++) {
ItemPointer location(tile_group->GetTileGroupId(), tuple_id);
if (type_ == planner::HYBRID && item_pointers_.size() > 0 &&
if (type_ == HYBRID_SCAN_TYPE_HYBRID && item_pointers_.size() > 0 &&
location.block <= upper_bound_block) {
if (item_pointers_.find(location) != item_pointers_.end()) {
continue;
}
}

// check transaction visibility
// Check transaction visibility
if (transaction_manager.IsVisible(tile_group_header, tuple_id)) {
// if the tuple is visible, then perform predicate evaluation.
// If the tuple is visible, then perform predicate evaluation.
if (predicate_ == nullptr) {
position_list.push_back(tuple_id);
} else {
}
else {
expression::ContainerTuple<storage::TileGroup> tuple(tile_group.get(),
tuple_id);
auto eval =
predicate_->Evaluate(&tuple, nullptr, executor_context_).IsTrue();
auto eval = predicate_->Evaluate(&tuple, nullptr, executor_context_).IsTrue();
if (eval == true) {
position_list.push_back(tuple_id);
}
}
} else {
}
else {
expression::ContainerTuple<storage::TileGroup> tuple(tile_group.get(),
tuple_id);
auto eval =
Expand All @@ -217,6 +238,7 @@ bool HybridScanExecutor::SeqScanUtil() {
}
}
}

// Don't return empty tiles
if (position_list.size() == 0) {
continue;
Expand All @@ -226,8 +248,10 @@ bool HybridScanExecutor::SeqScanUtil() {
std::unique_ptr<LogicalTile> logical_tile(LogicalTileFactory::GetTile());
logical_tile->AddColumns(tile_group, column_ids_);
logical_tile->AddPositionList(std::move(position_list));

LOG_TRACE("Hybrid executor, Seq Scan :: Got a logical tile");
SetOutput(logical_tile.release());

return true;
}

Expand All @@ -252,22 +276,35 @@ bool HybridScanExecutor::IndexScanUtil() {
}

bool HybridScanExecutor::DExecute() {
if (type_ == planner::SEQ) {

// SEQUENTIAL SCAN
if (type_ == HYBRID_SCAN_TYPE_SEQUENTIAL) {
LOG_TRACE("Sequential Scan");
return SeqScanUtil();
} else if (type_ == planner::INDEX) {
// LOG_TRACE("Hybrrd Scan executor, Index Scan :: 0 child");
assert(children_.size() == 0);
if (!index_done_) {
}
// INDEX SCAN
else if (type_ == HYBRID_SCAN_TYPE_INDEX) {
LOG_TRACE("Index Scan");
PL_ASSERT(children_.size() == 0);

if (index_done_ == false) {
if (index_->GetIndexType() == INDEX_CONSTRAINT_TYPE_PRIMARY_KEY) {
auto status = ExecPrimaryIndexLookup();
if (status == false) return false;
} else {
if (status == false) {
return false;
}
}
else {
return false;
}
}

return IndexScanUtil();
} else {
}
// HYBRID SCAN
else if (type_ == HYBRID_SCAN_TYPE_HYBRID) {
LOG_TRACE("Hybrid Scan");

// do two part search
if (index_done_ == false) {
// Timer<> timer;
Expand All @@ -286,10 +323,15 @@ bool HybridScanExecutor::DExecute() {
// Scan seq
return SeqScanUtil();
}
// FALLBACK
else {
throw Exception("Invalid hybrid scan type : " + std::to_string(type_));
}

}

bool HybridScanExecutor::ExecPrimaryIndexLookup() {
assert(!index_done_);
PL_ASSERT(index_done_ == false);

const planner::HybridScanPlan &node = GetPlanNode<planner::HybridScanPlan>();

Expand All @@ -298,16 +340,21 @@ bool HybridScanExecutor::ExecPrimaryIndexLookup() {

std::vector<ItemPointer *> tuple_location_ptrs;

assert(index_->GetIndexType() == INDEX_CONSTRAINT_TYPE_PRIMARY_KEY);
PL_ASSERT(index_->GetIndexType() == INDEX_CONSTRAINT_TYPE_PRIMARY_KEY);

if (0 == key_column_ids_.size()) {
LOG_TRACE("Scan all keys");
index_->ScanAllKeys(tuple_location_ptrs);
} else {
index_->Scan(values_, key_column_ids_, expr_type_,
SCAN_DIRECTION_TYPE_FORWARD, tuple_location_ptrs);
LOG_TRACE("Scan");
index_->Scan(values_,
key_column_ids_,
expr_type_,
SCAN_DIRECTION_TYPE_FORWARD,
tuple_location_ptrs);
}

LOG_TRACE("Tuple_locations.size(): %lu", tuple_location_ptrs.size());
LOG_TRACE("Result tuple count: %lu", tuple_location_ptrs.size());

auto &transaction_manager =
concurrency::TransactionManagerFactory::GetInstance();
Expand All @@ -317,16 +364,15 @@ bool HybridScanExecutor::ExecPrimaryIndexLookup() {
return false;
}

// std::set<oid_t> oid_ts;

std::map<oid_t, std::vector<oid_t>> visible_tuples;

// for every tuple that is found in the index.
for (auto tuple_location_ptr : tuple_location_ptrs) {
ItemPointer tuple_location = *tuple_location_ptr;

if (type_ == planner::HYBRID && tuple_location.block >= (block_threshold)) {
if (type_ == HYBRID_SCAN_TYPE_HYBRID &&
tuple_location.block >= (block_threshold)) {
item_pointers_.insert(tuple_location);
// oid_ts.insert(tuple_location.block);
}

auto &manager = catalog::Manager::GetInstance();
Expand Down Expand Up @@ -360,19 +406,14 @@ bool HybridScanExecutor::ExecPrimaryIndexLookup() {
// check whether older version is garbage.
if (old_end_cid < max_committed_cid) {
assert(tile_group_header->GetTransactionId(old_item.offset) ==
INITIAL_TXN_ID ||
tile_group_header->GetTransactionId(old_item.offset) ==
INVALID_TXN_ID);
INITIAL_TXN_ID ||
tile_group_header->GetTransactionId(old_item.offset) ==
INVALID_TXN_ID);

if (tile_group_header->SetAtomicTransactionId(
old_item.offset, INVALID_TXN_ID) == true) {
old_item.offset, INVALID_TXN_ID) == true) {
// atomically swap item pointer held in the index bucket.
AtomicUpdateItemPointer(tuple_location_ptr, tuple_location);

// currently, let's assume only primary index exists.
// gc::GCManagerFactory::GetInstance().RecycleTupleSlot(
// table_->GetOid(), old_item.block, old_item.offset,
// max_committed_cid);
}
}

Expand All @@ -388,9 +429,11 @@ bool HybridScanExecutor::ExecPrimaryIndexLookup() {
auto tile_group = manager.GetTileGroup(tuples.first);

std::unique_ptr<LogicalTile> logical_tile(LogicalTileFactory::GetTile());

// Add relevant columns to logical tile
logical_tile->AddColumns(tile_group, full_column_ids_);
logical_tile->AddPositionList(std::move(tuples.second));

if (column_ids_.size() != 0) {
logical_tile->ProjectColumns(full_column_ids_, column_ids_);
}
Expand Down
3 changes: 3 additions & 0 deletions src/executor/index_scan_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ bool IndexScanExecutor::DInit() {
PL_ASSERT(index_ != nullptr);

result_itr_ = START_OID;
result_.clear();
done_ = false;
key_ready_ = false;

auto column_ids_ = node.GetColumnIds();
auto key_column_ids_ = node.GetKeyColumnIds();
Expand Down Expand Up @@ -362,6 +364,7 @@ bool IndexScanExecutor::ExecSecondaryIndexLookup() {
}
}
}

// Construct a logical tile for each block
for (auto tuples : visible_tuples) {
auto &manager = catalog::Manager::GetInstance();
Expand Down
4 changes: 3 additions & 1 deletion src/executor/insert_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace executor {
*/
InsertExecutor::InsertExecutor(const planner::AbstractPlan *node,
ExecutorContext *executor_context)
: AbstractExecutor(node, executor_context) {}
: AbstractExecutor(node, executor_context) {}

/**
* @brief Nothing to init at the moment.
Expand Down Expand Up @@ -98,6 +98,7 @@ bool InsertExecutor::DExecute() {
peloton::Result::RESULT_FAILURE);
return false;
}

auto res = transaction_manager.PerformInsert(location);
if (!res) {
transaction_manager.SetTransactionResult(RESULT_FAILURE);
Expand Down Expand Up @@ -141,6 +142,7 @@ bool InsertExecutor::DExecute() {

// Bulk Insert Mode
for (oid_t insert_itr = 0; insert_itr < bulk_insert_count; insert_itr++) {

// Carry out insertion
ItemPointer location = target_table->InsertTuple(tuple);
LOG_TRACE("Inserted into location: %u, %u", location.block,
Expand Down
Loading

0 comments on commit b6f1239

Please sign in to comment.