Skip to content

Commit

Permalink
Add Aggregation Merge Operator (facebook#9780)
Browse files Browse the repository at this point in the history
Summary:
Add a merge operator that allows users to register specific aggregation function so that they can does aggregation based per key using different aggregation types.
See comments of function CreateAggMergeOperator() for actual usage.

Pull Request resolved: facebook#9780

Test Plan: Add a unit test to coverage various cases.

Reviewed By: ltamasi

Differential Revision: D35267444

fbshipit-source-id: 5b02f31c4f3e17e96dd4025cdc49fca8c2868628
  • Loading branch information
siying authored and facebook-github-bot committed Apr 16, 2022
1 parent db536ee commit 4f9c0fd
Show file tree
Hide file tree
Showing 12 changed files with 731 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ set(SOURCES
util/thread_local.cc
util/threadpool_imp.cc
util/xxhash.cc
utilities/agg_merge/agg_merge.cc
utilities/backup/backup_engine.cc
utilities/blob_db/blob_compaction_filter.cc
utilities/blob_db/blob_db.cc
Expand Down Expand Up @@ -1335,6 +1336,7 @@ if(WITH_TESTS)
util/thread_list_test.cc
util/thread_local_test.cc
util/work_queue_test.cc
utilities/agg_merge/agg_merge_test.cc
utilities/backup/backup_engine_test.cc
utilities/blob_db/blob_db_test.cc
utilities/cassandra/cassandra_functional_test.cc
Expand Down Expand Up @@ -1370,6 +1372,7 @@ if(WITH_TESTS)
db/db_test_util.cc
monitoring/thread_status_updater_debug.cc
table/mock_table.cc
utilities/agg_merge/test_agg_merge.cc
utilities/cassandra/test_utils.cc
)
enable_testing()
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Add event listener support on remote compaction compactor side.
* Added a dedicated integer DB property `rocksdb.live-blob-file-garbage-size` that exposes the total amount of garbage in the blob files in the current version.
* RocksDB does internal auto prefetching if it notices sequential reads. It starts with readahead size `initial_auto_readahead_size` which now can be configured through BlockBasedTableOptions.
* Add a merge operator that allows users to register specific aggregation function so that they can does aggregation using different aggregation types for different keys. See comments in include/rocksdb/utilities/agg_merge.h for actual usage. The feature is experimental and the format is subject to change and we won't provide a migration tool.

### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,9 @@ ribbon_test: $(OBJ_DIR)/util/ribbon_test.o $(TEST_LIBRARY) $(LIBRARY)
option_change_migration_test: $(OBJ_DIR)/utilities/option_change_migration/option_change_migration_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

agg_merge_test: $(OBJ_DIR)/utilities/agg_merge/agg_merge_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

stringappend_test: $(OBJ_DIR)/utilities/merge_operators/string_append/stringappend_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
9 changes: 9 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
"utilities/agg_merge/agg_merge.cc",
"utilities/backup/backup_engine.cc",
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
Expand Down Expand Up @@ -563,6 +564,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
"utilities/agg_merge/agg_merge.cc",
"utilities/backup/backup_engine.cc",
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
Expand Down Expand Up @@ -652,6 +654,7 @@ cpp_library_wrapper(name="rocksdb_test_lib", srcs=[
"test_util/testutil.cc",
"tools/block_cache_analyzer/block_cache_trace_analyzer.cc",
"tools/trace_analyzer_tool.cc",
"utilities/agg_merge/test_agg_merge.cc",
"utilities/cassandra/test_utils.cc",
], deps=[":rocksdb_lib"], headers=None, link_whole=False, extra_test_libs=True)

Expand Down Expand Up @@ -4698,6 +4701,12 @@ fancy_bench_wrapper(suite_name="rocksdb_microbench_suite_14_slow", binary_to_ben
# Do not build the tests in opt mode, since SyncPoint and other test code
# will not be included.

cpp_unittest_wrapper(name="agg_merge_test",
srcs=["utilities/agg_merge/agg_merge_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="arena_test",
srcs=["memory/arena_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

#include <stdint.h>
#include <stdio.h>

#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/metadata.h"
Expand Down
138 changes: 138 additions & 0 deletions include/rocksdb/utilities/agg_merge.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <string>
#include <vector>

#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"

namespace ROCKSDB_NAMESPACE {
// The feature is still in development so the encoding format is subject
// to change.
//
// Aggregation Merge Operator is a merge operator that allows users to
// aggregate merge operands of different keys with different registered
// aggregation functions. The aggregation can also change for the same
// key if the functions store the data in the same format.
// The target application highly overlaps with merge operator in general
// but we try to provide a better interface so that users are more likely
// to use pre-implemented plug-in functions and connect with existing
// third-party aggregation functions (such as those from SQL engines).
// In this case, the need for users to write customized C++ plug-in code
// is reduced.
// If the idea proves to useful, we might consider to move it to be
// a core functionality of RocksDB, and reduce the support of merge
// operators.
//
// Users can implement aggregation functions by implementing abstract
// class Aggregator, and register it using AddAggregator().
// The merge operator can be retrieved from GetAggMergeOperator() and
// it is a singleton.
//
// Users can push values to be updated with a merge operand encoded with
// registered function name and payload using EncodeAggFuncAndPayload(),
// and the merge operator will invoke the aggregation function.
// An example:
//
// // Assume class ExampleSumAggregator is implemented to do simple sum.
// AddAggregator("sum", std::make_unique<ExampleSumAggregator>());
// std::shared_ptr<MergeOperator> mp_guard = CreateAggMergeOperator();
// options.merge_operator = mp_guard.get();
// ...... // Creating DB
//
//
// std::string encoded_value;
// s = EncodeAggFuncAndPayload(kUnamedFuncName, "200", encoded_value);
// assert(s.ok());
// db->Put(WriteOptions(), "foo", encoded_value);
// s = EncodeAggFuncAndPayload("sum", "200", encoded_value);
// assert(s.ok());
// db->Merge(WriteOptions(), "foo", encoded_value);
// s = EncodeAggFuncAndPayload("sum", "200", encoded_value);
// assert(s.ok());
// db->Merge(WriteOptions(), "foo", encoded_value);
//
// std::string value;
// Status s = db->Get(ReadOptions, "foo", &value);
// assert(s.ok());
// Slice func, aggregated_value;
// assert(ExtractAggFuncAndValue(value, func, aggregated_value));
// assert(func == "sum");
// assert(aggregated_value == "600");
//
//
// DB::Put() can also be used to add a payloadin the same way as Merge().
//
// kUnamedFuncName can be used as a placeholder function name. This will
// be aggregated with merge operands inserted later based on function
// name given there.
//
// If the aggregation function is not registered or there is an error
// returned by aggregation function, the result will be encoded with a fake
// aggregation function kErrorFuncName, with each merge operands to be encoded
// into a list that can be extracted using ExtractList();
//
// If users add a merge operand using a different aggregation function from
// the previous one, the merge operands for the previous one is aggregated
// and the payload part of the result is treated as the first payload of
// the items for the new aggregation function. For example, users can
// Merge("plus, 1"), merge("plus 2"), merge("minus 3") and the aggregation
// result would be "minus 0".
//

// A class used to aggregate data per key/value. The plug-in function is
// implemented and registered using AddAggregator(). And then use it
// with merge operator created using CreateAggMergeOperator().
class Aggregator {
public:
virtual ~Aggregator() {}
// The input list is in reverse insertion order, with values[0] to be
// the one inserted last and values.back() to be the one inserted first.
// The oldest one might be from Get().
// Return whether aggregation succeeded. False for aggregation error.
virtual bool Aggregate(const std::vector<Slice>& values,
std::string& result) const = 0;

// True if a partial aggregation should be invoked. Some aggregators
// might opt to skip partial aggregation if possible.
virtual bool DoPartialAggregate() const { return true; }
};

// The function adds aggregation plugin by function name. It is used
// by all the aggregation operator created using CreateAggMergeOperator().
// It's currently not thread safe to run concurrently with the aggregation
// merge operator. It is recommended that all the aggregation function
// is added before calling CreateAggMergeOperator().
Status AddAggregator(const std::string& function_name,
std::unique_ptr<Aggregator>&& agg);

// Get the singleton instance of merge operator for aggregation.
// Always the same one is returned with a shared_ptr is hold as a
// static variable by the function.
// This is done so because options.merge_operator is shared_ptr.
std::shared_ptr<MergeOperator> GetAggMergeOperator();

// Encode aggregation function and payload that can be consumed by aggregation
// merge operator.
Status EncodeAggFuncAndPayload(const Slice& function_name, const Slice& payload,
std::string& output);
// Helper function to extract aggregation function name and payload.
// Return false if it fails to decode.
bool ExtractAggFuncAndValue(const Slice& op, Slice& func, Slice& value);

// Extract encoded list. This can be used to extract error merge operands when
// the returned function name is kErrorFuncName.
bool ExtractList(const Slice& encoded_list, std::vector<Slice>& decoded_list);

// Special function name that allows it to be merged to subsequent type.
extern const std::string kUnnamedFuncName;

// Special error function name reserved for merging or aggregation error.
extern const std::string kErrorFuncName;

} // namespace ROCKSDB_NAMESPACE
3 changes: 3 additions & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ LIB_SOURCES = \
util/thread_local.cc \
util/threadpool_imp.cc \
util/xxhash.cc \
utilities/agg_merge/agg_merge.cc \
utilities/backup/backup_engine.cc \
utilities/blob_db/blob_compaction_filter.cc \
utilities/blob_db/blob_db.cc \
Expand Down Expand Up @@ -364,6 +365,7 @@ TEST_LIB_SOURCES = \
test_util/mock_time_env.cc \
test_util/testharness.cc \
test_util/testutil.cc \
utilities/agg_merge/test_agg_merge.cc \
utilities/cassandra/test_utils.cc \

FOLLY_SOURCES = \
Expand Down Expand Up @@ -559,6 +561,7 @@ TEST_MAIN_SOURCES = \
util/thread_list_test.cc \
util/thread_local_test.cc \
util/work_queue_test.cc \
utilities/agg_merge/agg_merge_test.cc \
utilities/backup/backup_engine_test.cc \
utilities/blob_db/blob_db_test.cc \
utilities/cassandra/cassandra_format_test.cc \
Expand Down
Loading

0 comments on commit 4f9c0fd

Please sign in to comment.