From 1934706ca902b2274610abc10d720e6622b8548d Mon Sep 17 00:00:00 2001 From: Ivan Lezhankin Date: Thu, 16 Jan 2020 15:37:29 +0300 Subject: [PATCH] [WIP] --- .clang-format | 2 +- dbms/src/Common/Allocator.h | 22 +-- dbms/src/Common/MemoryTracker.cpp | 27 ++-- dbms/src/Common/QueryProfiler.cpp | 80 ++--------- dbms/src/Common/QueryProfiler.h | 7 - dbms/src/Common/TraceCollector.cpp | 159 ++++++++++++++++++---- dbms/src/Common/TraceCollector.h | 29 +++- dbms/src/Common/new_delete.cpp | 26 ++-- dbms/src/Interpreters/Context.cpp | 6 +- dbms/src/Interpreters/TraceLog.cpp | 6 +- dbms/src/Interpreters/TraceLog.h | 9 +- libs/libcommon/include/common/singleton.h | 44 ++++++ 12 files changed, 269 insertions(+), 148 deletions(-) create mode 100644 libs/libcommon/include/common/singleton.h diff --git a/.clang-format b/.clang-format index 6af78340db1e..8416ba8ce726 100644 --- a/.clang-format +++ b/.clang-format @@ -52,12 +52,12 @@ IncludeCategories: ReflowComments: false AlignEscapedNewlinesLeft: false AlignEscapedNewlines: DontAlign +AlignTrailingComments: true # Not changed: AccessModifierOffset: -4 AlignConsecutiveAssignments: false AlignOperands: false -AlignTrailingComments: false AllowAllParametersOfDeclarationOnNextLine: true AllowShortBlocksOnASingleLine: false AllowShortCaseLabelsOnASingleLine: false diff --git a/dbms/src/Common/Allocator.h b/dbms/src/Common/Allocator.h index 5d39d3272433..513e62f45750 100644 --- a/dbms/src/Common/Allocator.h +++ b/dbms/src/Common/Allocator.h @@ -93,8 +93,9 @@ class Allocator /// Allocate memory range. void * alloc(size_t size, size_t alignment = 0) { + auto * ptr = allocNoTrack(size, alignment); CurrentMemoryTracker::alloc(size); - return allocNoTrack(size, alignment); + return ptr; } /// Free memory range. @@ -118,13 +119,13 @@ class Allocator else if (old_size < MMAP_THRESHOLD && new_size < MMAP_THRESHOLD && alignment <= MALLOC_MIN_ALIGNMENT) { - /// Resize malloc'd memory region with no special alignment requirement. - CurrentMemoryTracker::realloc(old_size, new_size); - void * new_buf = ::realloc(buf, new_size); if (nullptr == new_buf) DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY); + /// Resize malloc'd memory region with no special alignment requirement. + CurrentMemoryTracker::realloc(old_size, new_size); + buf = new_buf; if constexpr (clear_memory) if (new_size > old_size) @@ -132,25 +133,26 @@ class Allocator } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) { - /// Resize mmap'd memory region. - CurrentMemoryTracker::realloc(old_size, new_size); - // On apple and freebsd self-implemented mremap used (common/mremap.h) buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (MAP_FAILED == buf) DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP); + /// Resize mmap'd memory region. + CurrentMemoryTracker::realloc(old_size, new_size); + /// No need for zero-fill, because mmap guarantees it. } else if (new_size < MMAP_THRESHOLD) { - /// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once. - CurrentMemoryTracker::realloc(old_size, new_size); - void * new_buf = allocNoTrack(new_size, alignment); memcpy(new_buf, buf, std::min(old_size, new_size)); freeNoTrack(buf, old_size); + + /// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once. + CurrentMemoryTracker::realloc(old_size, new_size); + buf = new_buf; } else diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index b3d661d95eee..598998787657 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -1,12 +1,15 @@ -#include - #include "MemoryTracker.h" -#include -#include + +#include +#include "Common/TraceCollector.h" +#include #include #include -#include -#include +#include +#include +#include + +#include namespace DB @@ -73,7 +76,7 @@ void MemoryTracker::alloc(Int64 size) return; /** Using memory_order_relaxed means that if allocations are done simultaneously, - * we allow exception about memory limit exceeded to be thrown only on next allocation. + * we allow exception about memory limit exceeded to be thrown only on next allocation. * So, we allow over-allocations. */ Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); @@ -207,10 +210,13 @@ namespace CurrentMemoryTracker if (untracked > untracked_memory_limit) { /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes - /// more. It could be usefull for enlarge Exception message in rethrow logic. + /// more. It could be useful to enlarge Exception message in rethrow logic. Int64 tmp = untracked; untracked = 0; memory_tracker->alloc(tmp); + + auto no_track = memory_tracker->blocker.cancel(); + Singleton()->collect(tmp); } } } @@ -218,10 +224,7 @@ namespace CurrentMemoryTracker void realloc(Int64 old_size, Int64 new_size) { Int64 addition = new_size - old_size; - if (addition > 0) - alloc(addition); - else - free(-addition); + addition > 0 ? alloc(addition) : free(-addition); } void free(Int64 size) diff --git a/dbms/src/Common/QueryProfiler.cpp b/dbms/src/Common/QueryProfiler.cpp index ea2638f85445..8ca4c299b2ec 100644 --- a/dbms/src/Common/QueryProfiler.cpp +++ b/dbms/src/Common/QueryProfiler.cpp @@ -1,92 +1,38 @@ #include "QueryProfiler.h" -#include -#include -#include -#include -#include -#include -#include -#include +#include #include +#include +#include #include -#include -#include +#include +#include +#include +#include +#include +#include -namespace ProfileEvents -{ - extern const Event QueryProfilerSignalOverruns; -} namespace DB { -extern LazyPipeFDs trace_pipe; - namespace { - /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. - /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. - constexpr size_t QUERY_ID_MAX_LEN = 1024; - -#if defined(OS_LINUX) - thread_local size_t write_trace_iteration = 0; -#endif - void writeTraceInfo(TraceType trace_type, int /* sig */, siginfo_t * info, void * context) { + int overrun_count = 0; #if defined(OS_LINUX) - /// Quickly drop if signal handler is called too frequently. - /// Otherwise we may end up infinitelly processing signals instead of doing any useful work. - ++write_trace_iteration; - if (info && info->si_overrun > 0) - { - /// But pass with some frequency to avoid drop of all traces. - if (write_trace_iteration % info->si_overrun == 0) - { - ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun); - } - else - { - ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, info->si_overrun + 1); - return; - } - } + if (info) + overrun_count = info->si_overrun; #else UNUSED(info); #endif - constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag - 8 * sizeof(char) + // maximum VarUInt length for string size - QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length - sizeof(UInt8) + // number of stack frames - sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity - sizeof(TraceType) + // timer type - sizeof(UInt32); // thread_number - char buffer[buf_size]; - WriteBufferFromFileDescriptorDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer); - - StringRef query_id = CurrentThread::getQueryId(); - query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); - - UInt32 thread_number = CurrentThread::get().thread_number; - const auto signal_context = *reinterpret_cast(context); const StackTrace stack_trace(signal_context); - writeChar(false, out); - writeStringBinary(query_id, out); - - size_t stack_trace_size = stack_trace.getSize(); - size_t stack_trace_offset = stack_trace.getOffset(); - writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out); - for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) - writePODBinary(stack_trace.getFrames()[i], out); - - writePODBinary(trace_type, out); - writePODBinary(thread_number, out); - out.next(); + Singleton()->collect(trace_type, stack_trace, overrun_count); } [[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9; diff --git a/dbms/src/Common/QueryProfiler.h b/dbms/src/Common/QueryProfiler.h index dbe65b068989..075a8f41c66f 100644 --- a/dbms/src/Common/QueryProfiler.h +++ b/dbms/src/Common/QueryProfiler.h @@ -15,13 +15,6 @@ namespace Poco namespace DB { -enum class TraceType : UInt8 -{ - REAL_TIME, - CPU_TIME, - MEMORY, -}; - /** * Query profiler implementation for selected thread. * diff --git a/dbms/src/Common/TraceCollector.cpp b/dbms/src/Common/TraceCollector.cpp index 6aeae2fb9f38..09ae0bc15d28 100644 --- a/dbms/src/Common/TraceCollector.cpp +++ b/dbms/src/Common/TraceCollector.cpp @@ -1,25 +1,38 @@ #include "TraceCollector.h" #include +#include +#include +#include +#include +#include +#include #include +#include #include #include #include -#include -#include -#include -#include -#include -#include #include #include +namespace ProfileEvents +{ + extern const Event QueryProfilerSignalOverruns; +} + namespace DB { -LazyPipeFDs trace_pipe; +namespace +{ + /// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id. + /// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler. + constexpr size_t QUERY_ID_MAX_LEN = 1024; + + thread_local size_t write_trace_iteration = 0; +} namespace ErrorCodes { @@ -27,20 +40,15 @@ namespace ErrorCodes extern const int THREAD_IS_NOT_JOINABLE; } -TraceCollector::TraceCollector(std::shared_ptr & trace_log_) - : log(&Poco::Logger::get("TraceCollector")) - , trace_log(trace_log_) +TraceCollector::TraceCollector() : log(&Poco::Logger::get("TraceCollector")) { - if (trace_log == nullptr) - throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE); - - trace_pipe.open(); + pipe.open(); /** Turn write end of pipe to non-blocking mode to avoid deadlocks * when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe. */ - trace_pipe.setNonBlocking(); - trace_pipe.tryIncreaseSize(1 << 20); + pipe.setNonBlocking(); + pipe.tryIncreaseSize(1 << 20); thread = ThreadFromGlobalPool(&TraceCollector::run, this); } @@ -51,11 +59,100 @@ TraceCollector::~TraceCollector() LOG_ERROR(log, "TraceCollector thread is malformed and cannot be joined"); else { - TraceCollector::notifyToStop(); + stop(); thread.join(); } - trace_pipe.close(); + pipe.close(); +} + +void TraceCollector::collect(TraceType trace_type, const StackTrace & stack_trace, int overrun_count) +{ + /// Quickly drop if signal handler is called too frequently. + /// Otherwise we may end up infinitelly processing signals instead of doing any useful work. + ++write_trace_iteration; + if (overrun_count) + { + /// But pass with some frequency to avoid drop of all traces. + if (write_trace_iteration % overrun_count == 0) + { + ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count); + } + else + { + ProfileEvents::increment(ProfileEvents::QueryProfilerSignalOverruns, overrun_count + 1); + return; + } + } + + constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag + 8 * sizeof(char) + // maximum VarUInt length for string size + QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length + sizeof(UInt8) + // number of stack frames + sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity + sizeof(TraceType) + // trace type + sizeof(UInt32) + // thread_number + sizeof(Int64) + // size + sizeof(UInt64); // pointer + char buffer[buf_size]; + WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer); + + StringRef query_id = CurrentThread::getQueryId(); + query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); + + UInt32 thread_number = CurrentThread::get().thread_number; + + writeChar(false, out); + writeStringBinary(query_id, out); + + size_t stack_trace_size = stack_trace.getSize(); + size_t stack_trace_offset = stack_trace.getOffset(); + writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out); + for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) + writePODBinary(stack_trace.getFrames()[i], out); + + writePODBinary(trace_type, out); + writePODBinary(thread_number, out); + writePODBinary(0, out); + writePODBinary(0, out); + + out.next(); +} + +void TraceCollector::collect(UInt64 size) +{ + constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag + 8 * sizeof(char) + // maximum VarUInt length for string size + QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length + sizeof(UInt8) + // number of stack frames + sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity + sizeof(TraceType) + // trace type + sizeof(UInt32) + // thread_number + sizeof(UInt64); // size + char buffer[buf_size]; + WriteBufferFromFileDescriptorDiscardOnFailure out(pipe.fds_rw[1], buf_size, buffer); + + StringRef query_id = CurrentThread::getQueryId(); + query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN); + + UInt32 thread_number = CurrentThread::get().thread_number; + + writeChar(false, out); + writeStringBinary(query_id, out); + + const auto & stack_trace = StackTrace(); + + size_t stack_trace_size = stack_trace.getSize(); + size_t stack_trace_offset = stack_trace.getOffset(); + writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out); + for (size_t i = stack_trace_offset; i < stack_trace_size; ++i) + writePODBinary(stack_trace.getFrames()[i], out); + + writePODBinary(TraceType::MEMORY, out); + writePODBinary(thread_number, out); + writePODBinary(size, out); + + out.next(); } /** @@ -68,16 +165,20 @@ TraceCollector::~TraceCollector() * NOTE: TraceCollector will NOT stop immediately as there may be some data left in the pipe * before stop message. */ -void TraceCollector::notifyToStop() +void TraceCollector::stop() { - WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1]); + WriteBufferFromFileDescriptor out(pipe.fds_rw[1]); writeChar(true, out); out.next(); } void TraceCollector::run() { - ReadBufferFromFileDescriptor in(trace_pipe.fds_rw[0]); + ReadBufferFromFileDescriptor in(pipe.fds_rw[0]); + + /// FIXME: use condvar to wait for |trace_log| + while (!trace_log) + sleep(1); while (true) { @@ -89,13 +190,13 @@ void TraceCollector::run() std::string query_id; readStringBinary(query_id, in); - UInt8 size = 0; - readIntBinary(size, in); + UInt8 trace_size = 0; + readIntBinary(trace_size, in); Array trace; - trace.reserve(size); + trace.reserve(trace_size); - for (size_t i = 0; i < size; i++) + for (size_t i = 0; i < trace_size; i++) { uintptr_t addr = 0; readPODBinary(addr, in); @@ -108,7 +209,13 @@ void TraceCollector::run() UInt32 thread_number; readPODBinary(thread_number, in); - TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace}; + Int64 size; + readPODBinary(size, in); + + UInt64 pointer; + readPODBinary(pointer, in); + + TraceLogElement element{std::time(nullptr), trace_type, thread_number, query_id, trace, size, pointer}; trace_log->add(element); } } diff --git a/dbms/src/Common/TraceCollector.h b/dbms/src/Common/TraceCollector.h index 5d1b37753563..f095e64332bd 100644 --- a/dbms/src/Common/TraceCollector.h +++ b/dbms/src/Common/TraceCollector.h @@ -1,7 +1,10 @@ #pragma once +#include "Common/PipeFDs.h" #include +class StackTrace; + namespace Poco { class Logger; @@ -12,21 +15,33 @@ namespace DB class TraceLog; +enum class TraceType : UInt8 +{ + REAL_TIME, + CPU_TIME, + MEMORY, +}; + class TraceCollector { +public: + TraceCollector(); + ~TraceCollector(); + + void setTraceLog(const std::shared_ptr & trace_log_) { trace_log = trace_log_; } + + void collect(TraceType type, const StackTrace & stack_trace, int overrun_count = 0); + void collect(UInt64 size); + + void stop(); + private: Poco::Logger * log; std::shared_ptr trace_log; ThreadFromGlobalPool thread; + LazyPipeFDs pipe; void run(); - - static void notifyToStop(); - -public: - TraceCollector(std::shared_ptr & trace_log_); - - ~TraceCollector(); }; } diff --git a/dbms/src/Common/new_delete.cpp b/dbms/src/Common/new_delete.cpp index cbf9b93290ee..9791a53470cc 100644 --- a/dbms/src/Common/new_delete.cpp +++ b/dbms/src/Common/new_delete.cpp @@ -1,14 +1,16 @@ -#if defined(OS_LINUX) -#include -#elif defined(OS_DARWIN) -#include -#endif -#include - #include #include #include +#include +#include + +#if defined(OS_LINUX) +# include +#elif defined(OS_DARWIN) +# include +#endif + /// Replace default new/delete with memory tracking versions. /// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new /// https://en.cppreference.com/w/cpp/memory/new/operator_delete @@ -29,7 +31,7 @@ ALWAYS_INLINE void trackMemory(std::size_t size) #endif } -ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept +ALWAYS_INLINE bool trackMemoryNoExcept(std::size_t size) noexcept { try { @@ -54,11 +56,11 @@ ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [ #else if (size) CurrentMemoryTracker::free(size); -#ifdef _GNU_SOURCE +# ifdef _GNU_SOURCE /// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size. else CurrentMemoryTracker::free(malloc_usable_size(ptr)); -#endif +# endif #endif } catch (...) @@ -83,14 +85,14 @@ void * operator new[](std::size_t size) void * operator new(std::size_t size, const std::nothrow_t &) noexcept { - if (likely(Memory::trackMemoryNoExept(size))) + if (likely(Memory::trackMemoryNoExcept(size))) return Memory::newNoExept(size); return nullptr; } void * operator new[](std::size_t size, const std::nothrow_t &) noexcept { - if (likely(Memory::trackMemoryNoExept(size))) + if (likely(Memory::trackMemoryNoExcept(size))) return Memory::newNoExept(size); return nullptr; } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 66ce18aa2c41..1ed64ab0c697 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -55,6 +55,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -298,8 +299,7 @@ struct ContextShared schedule_pool.reset(); ddl_worker.reset(); - /// Stop trace collector if any - trace_collector.reset(); + Singleton()->stop(); } bool hasTraceCollector() @@ -312,7 +312,7 @@ struct ContextShared if (trace_log == nullptr) return; - trace_collector = std::make_unique(trace_log); + Singleton()->setTraceLog(trace_log); } private: diff --git a/dbms/src/Interpreters/TraceLog.cpp b/dbms/src/Interpreters/TraceLog.cpp index 577bc6e22cc2..123664ee9fa9 100644 --- a/dbms/src/Interpreters/TraceLog.cpp +++ b/dbms/src/Interpreters/TraceLog.cpp @@ -27,7 +27,9 @@ Block TraceLogElement::createBlock() {std::make_shared(trace_values), "trace_type"}, {std::make_shared(), "thread_number"}, {std::make_shared(), "query_id"}, - {std::make_shared(std::make_shared()), "trace"} + {std::make_shared(std::make_shared()), "trace"}, + {std::make_shared(), "size"}, + {std::make_shared(), "pointer"}, }; } @@ -44,6 +46,8 @@ void TraceLogElement::appendToBlock(Block & block) const columns[i++]->insert(thread_number); columns[i++]->insertData(query_id.data(), query_id.size()); columns[i++]->insert(trace); + columns[i++]->insert(size); + columns[i++]->insert(pointer); block.setColumns(std::move(columns)); } diff --git a/dbms/src/Interpreters/TraceLog.h b/dbms/src/Interpreters/TraceLog.h index 52583bd996ac..edf2a090947f 100644 --- a/dbms/src/Interpreters/TraceLog.h +++ b/dbms/src/Interpreters/TraceLog.h @@ -1,9 +1,10 @@ #pragma once -#include -#include #include +#include #include +#include +#include namespace DB { @@ -19,6 +20,10 @@ struct TraceLogElement String query_id; Array trace; + /// for |TraceType::MEMORY| + Int64 size; /// Allocation size in bytes. In case of deallocation should match the allocation size. + UInt64 pointer; /// Address of allocated region - to track the deallocations. + static std::string name() { return "TraceLog"; } static Block createBlock(); void appendToBlock(Block & block) const; diff --git a/libs/libcommon/include/common/singleton.h b/libs/libcommon/include/common/singleton.h new file mode 100644 index 000000000000..9dd3c02ef87a --- /dev/null +++ b/libs/libcommon/include/common/singleton.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +template +class Singleton; + +/// For default-constructable type we don't need to implement |create()| +/// and may use "arrow" operator immediately. +template +class Singleton>> +{ +public: + T * operator->() + { + static T instance; + return &instance; + } +}; + +/// For custom-constructed type we have to enforce call to |create()| +/// before any use of "arrow" operator. +template +class Singleton>> +{ +public: + Singleton() = default; + + template + Singleton(const Args & ... args) + { + instance.reset(new T(args...)); + /// TODO: throw exception on double-creation. + } + + T * operator->() + { + return instance.get(); + } + +private: + inline static std::unique_ptr instance{}; +};