From fbb066269505a4dab611e3a428d6d326538c6823 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 9 Mar 2021 16:16:29 +0100 Subject: [PATCH] ARROW-11680: [C++] Add vendored version of folly's spsc queue See issue for rationale / description. To consider: * I based licensing text on other vendored libraries but I'm no expert in how exactly to dot all the i's and cross all the t's. * The constructor can throw if allocation of the ring buffer fails. In general, this sort of queue shouldn't be all that large anyways so this seems unlikely. We could modify the constructor to be private with a factory method that returns `Result` but at the moment I'm not making any changes to the existing code and so I'm hesitant to start doing so for this reason. * ~~There is a test case for the above point but I could take it out if desired.~~ Had to remove test since a massive allocation did not throw on mac. * ~~There is a comparative benchmark for boost. I'm not sure if we want to leave that in?~~ Had to remove benchmark since some build environments did not have boost::lockfree::spsc_queue * I've renamed folly::ProducerConsumerQueue to arrow::util::SpscQueue. I feel the name ProducerConsumerQueue is too vague and could lead to improper usage. This queue is not safe for multiple readers or multiple writers. Closes #9519 from westonpace/feature/folly Lead-authored-by: Weston Pace Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- LICENSE.txt | 16 ++ cpp/src/arrow/testing/gtest_util.h | 44 ++++ cpp/src/arrow/util/CMakeLists.txt | 2 + cpp/src/arrow/util/future_test.cc | 44 ---- cpp/src/arrow/util/queue.h | 29 +++ cpp/src/arrow/util/queue_benchmark.cc | 85 +++++++ cpp/src/arrow/util/queue_test.cc | 55 +++++ .../arrow/vendored/ProducerConsumerQueue.h | 217 ++++++++++++++++++ 8 files changed, 448 insertions(+), 44 deletions(-) create mode 100644 cpp/src/arrow/util/queue.h create mode 100644 cpp/src/arrow/util/queue_benchmark.cc create mode 100644 cpp/src/arrow/util/queue_test.cc create mode 100644 cpp/src/arrow/vendored/ProducerConsumerQueue.h diff --git a/LICENSE.txt b/LICENSE.txt index 4c2d96e6496da..1480c1401c06d 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -2119,6 +2119,22 @@ DEALINGS IN THE SOFTWARE. -------------------------------------------------------------------------------- +This project includes code from Folly. + + * cpp/src/arrow/vendored/ProducerConsumerQueue.h + +is based on Folly's + + * folly/Portability.h + * folly/lang/Align.h + * folly/ProducerConsumerQueue.h + +Copyright: Copyright (c) Facebook, Inc. and its affiliates. +Home page: https://github.com/facebook/folly +License: http://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + The file cpp/src/arrow/vendored/musl/strptime.c has the following license Copyright © 2005-2020 Rich Felker, et al. diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index ff3b751a39470..a024be7671d6c 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -521,6 +521,50 @@ void PrintTo(const Result& result, std::ostream* os) { } } +// A data type with only move constructors. +struct MoveOnlyDataType { + explicit MoveOnlyDataType(int x) : data(new int(x)) {} + + MoveOnlyDataType(const MoveOnlyDataType& other) = delete; + MoveOnlyDataType& operator=(const MoveOnlyDataType& other) = delete; + + MoveOnlyDataType(MoveOnlyDataType&& other) { MoveFrom(&other); } + MoveOnlyDataType& operator=(MoveOnlyDataType&& other) { + MoveFrom(&other); + return *this; + } + + ~MoveOnlyDataType() { Destroy(); } + + void Destroy() { + if (data != nullptr) { + delete data; + data = nullptr; + moves = -1; + } + } + + void MoveFrom(MoveOnlyDataType* other) { + Destroy(); + data = other->data; + other->data = nullptr; + moves = other->moves + 1; + } + + int ToInt() const { return data == nullptr ? -42 : *data; } + + bool operator==(int other) const { return data != nullptr && *data == other; } + bool operator==(const MoveOnlyDataType& other) const { + return data != nullptr && other.data != nullptr && *data == *other.data; + } + friend bool operator==(int left, const MoveOnlyDataType& right) { + return right == left; + } + + int* data = nullptr; + int moves = 0; +}; + } // namespace arrow namespace nonstd { diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 718307deedf86..cf8da28395269 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -54,6 +54,7 @@ add_arrow_test(utility-test ${IO_UTIL_TEST_SOURCES} iterator_test.cc logging_test.cc + queue_test.cc range_test.cc rle_encoding_test.cc stl_util_test.cc @@ -80,6 +81,7 @@ add_arrow_benchmark(decimal_benchmark) add_arrow_benchmark(hashing_benchmark) add_arrow_benchmark(int_util_benchmark) add_arrow_benchmark(machine_benchmark) +add_arrow_benchmark(queue_benchmark) add_arrow_benchmark(range_benchmark) add_arrow_benchmark(tdigest_benchmark) add_arrow_benchmark(thread_pool_benchmark) diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 0436007a88be6..70232d39c60ff 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -65,50 +65,6 @@ struct IterationTraits { static Foo End() { return Foo(-1); } }; -// A data type with only move constructors. -struct MoveOnlyDataType { - explicit MoveOnlyDataType(int x) : data(new int(x)) {} - - MoveOnlyDataType(const MoveOnlyDataType& other) = delete; - MoveOnlyDataType& operator=(const MoveOnlyDataType& other) = delete; - - MoveOnlyDataType(MoveOnlyDataType&& other) { MoveFrom(&other); } - MoveOnlyDataType& operator=(MoveOnlyDataType&& other) { - MoveFrom(&other); - return *this; - } - - ~MoveOnlyDataType() { Destroy(); } - - void Destroy() { - if (data != nullptr) { - delete data; - data = nullptr; - moves = -1; - } - } - - void MoveFrom(MoveOnlyDataType* other) { - Destroy(); - data = other->data; - other->data = nullptr; - moves = other->moves + 1; - } - - int ToInt() const { return data == nullptr ? -42 : *data; } - - bool operator==(int other) const { return data != nullptr && *data == other; } - bool operator==(const MoveOnlyDataType& other) const { - return data != nullptr && other.data != nullptr && *data == *other.data; - } - friend bool operator==(int left, const MoveOnlyDataType& right) { - return right == left; - } - - int* data = nullptr; - int moves = 0; -}; - template <> struct IterationTraits { static MoveOnlyDataType End() { return MoveOnlyDataType(-1); } diff --git a/cpp/src/arrow/util/queue.h b/cpp/src/arrow/util/queue.h new file mode 100644 index 0000000000000..6c71fa6e155e8 --- /dev/null +++ b/cpp/src/arrow/util/queue.h @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/vendored/ProducerConsumerQueue.h" + +namespace arrow { +namespace util { + +template +using SpscQueue = arrow_vendored::folly::ProducerConsumerQueue; + +} +} // namespace arrow diff --git a/cpp/src/arrow/util/queue_benchmark.cc b/cpp/src/arrow/util/queue_benchmark.cc new file mode 100644 index 0000000000000..a476541fd1a74 --- /dev/null +++ b/cpp/src/arrow/util/queue_benchmark.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +#include "arrow/buffer.h" +#include "arrow/util/logging.h" +#include "arrow/util/queue.h" + +namespace arrow { + +namespace util { + +static constexpr int64_t kSize = 100000; + +void Throughput(benchmark::State& state) { + SpscQueue> queue(16); + + std::vector> source; + std::vector> sink; + source.reserve(kSize); + sink.resize(kSize); + const uint8_t data[1] = {0}; + for (int64_t i = 0; i < kSize; i++) { + source.push_back(std::make_shared(data, 1)); + } + + for (auto _ : state) { + std::thread producer([&] { + auto itr = std::make_move_iterator(source.begin()); + auto end = std::make_move_iterator(source.end()); + while (itr != end) { + while (!queue.Write(*itr)) { + } + itr++; + } + }); + + std::thread consumer([&] { + auto itr = sink.begin(); + auto end = sink.end(); + while (itr != end) { + auto next = queue.FrontPtr(); + if (next != nullptr) { + (*itr).swap(*next); + queue.PopFront(); + itr++; + } + } + }); + + producer.join(); + consumer.join(); + std::swap(source, sink); + } + + for (const auto& buf : source) { + ARROW_CHECK(buf && buf->size() == 1); + } + state.SetItemsProcessed(state.iterations() * kSize); +} + +BENCHMARK(Throughput)->UseRealTime(); + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/queue_test.cc b/cpp/src/arrow/util/queue_test.cc new file mode 100644 index 0000000000000..388e4f11ba674 --- /dev/null +++ b/cpp/src/arrow/util/queue_test.cc @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/queue.h" + +namespace arrow { +namespace util { + +TEST(TestSpscQueue, TestMoveOnly) { + SpscQueue queue(3); + ASSERT_TRUE(queue.IsEmpty()); + ASSERT_FALSE(queue.IsFull()); + ASSERT_EQ(queue.SizeGuess(), 0); + + MoveOnlyDataType in(42); + queue.Write(std::move(in)); + ASSERT_FALSE(queue.IsEmpty()); + ASSERT_FALSE(queue.IsFull()); + ASSERT_EQ(queue.SizeGuess(), 1); + + queue.Write(43); + ASSERT_FALSE(queue.IsEmpty()); + ASSERT_TRUE(queue.IsFull()); + ASSERT_EQ(queue.SizeGuess(), 2); + + MoveOnlyDataType out = std::move(*queue.FrontPtr()); + ASSERT_EQ(42, *out.data); + queue.PopFront(); + ASSERT_TRUE(queue.Read(out)); + ASSERT_EQ(43, *out.data); + + ASSERT_TRUE(queue.IsEmpty()); + ASSERT_FALSE(queue.IsFull()); + ASSERT_EQ(queue.SizeGuess(), 0); +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/vendored/ProducerConsumerQueue.h b/cpp/src/arrow/vendored/ProducerConsumerQueue.h new file mode 100644 index 0000000000000..0b7cfa1cb166f --- /dev/null +++ b/cpp/src/arrow/vendored/ProducerConsumerQueue.h @@ -0,0 +1,217 @@ +// Vendored from git tag v2021.02.15.00 + +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (bhu@fb.com) +// @author Jordan DeLong (delong.j@fb.com) + +// This file has been modified as part of Apache Arrow to conform to +// Apache Arrow's coding conventions + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace arrow_vendored { +namespace folly { + +// Vendored from folly/Portability.h +namespace { +#if defined(__arm__) +#define FOLLY_ARM 1 +#else +#define FOLLY_ARM 0 +#endif + +#if defined(__s390x__) +#define FOLLY_S390X 1 +#else +#define FOLLY_S390X 0 +#endif + +constexpr bool kIsArchArm = FOLLY_ARM == 1; +constexpr bool kIsArchS390X = FOLLY_S390X == 1; +} // namespace + +// Vendored from folly/lang/Align.h +namespace { + +constexpr std::size_t hardware_destructive_interference_size = + (kIsArchArm || kIsArchS390X) ? 64 : 128; + +} // namespace + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template +struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue&) = delete; + ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete; + + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // IsFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), + records_(static_cast(std::malloc(sizeof(T) * size))), + readIndex_(0), + writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template + bool Write(Args&&... recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; + } + + // move the value at the front of the queue to given variable + bool Read(T& record) { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return false; + } + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + record = std::move(records_[currentRead]); + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T* FrontPtr() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return nullptr; + } + return &records_[currentRead]; + } + + // queue must not be empty + void PopFront() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + assert(currentRead != writeIndex_.load(std::memory_order_acquire)); + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + } + + bool IsEmpty() const { + return readIndex_.load(std::memory_order_acquire) == + writeIndex_.load(std::memory_order_acquire); + } + + bool IsFull() const { + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + return false; + } + // queue is full + return true; + } + + // * If called by consumer, then true size may be more (because producer may + // be adding items concurrently). + // * If called by producer, then true size may be less (because consumer may + // be removing items concurrently). + // * It is undefined to call this from any other thread. + size_t SizeGuess() const { + int ret = writeIndex_.load(std::memory_order_acquire) - + readIndex_.load(std::memory_order_acquire); + if (ret < 0) { + ret += size_; + } + return ret; + } + + // maximum number of items in the queue. + size_t capacity() const { return size_ - 1; } + + private: + using AtomicIndex = std::atomic; + + char pad0_[hardware_destructive_interference_size]; + const uint32_t size_; + T* const records_; + + AtomicIndex readIndex_; + char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; + AtomicIndex writeIndex_; + + char pad2_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; +}; + +} // namespace folly +} // namespace arrow_vendored