Skip to content

Commit

Permalink
Merge pull request ClickHouse#61911 from ClickHouse/jamesweb
Browse files Browse the repository at this point in the history
Make userspace page cache work with 'web' disks
  • Loading branch information
alexey-milovidov authored Mar 27, 2024
2 parents fc96a57 + d5da446 commit 795026a
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 39 deletions.
7 changes: 3 additions & 4 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,6 @@ The server successfully detected this situation and will download merged part fr
M(PerfLocalMemoryReferences, "Local NUMA node memory reads") \
M(PerfLocalMemoryMisses, "Local NUMA node memory read misses") \
\
M(CreatedHTTPConnections, "Total amount of created HTTP connections (counter increase every time connection is created).") \
\
M(CannotWriteToWriteBufferDiscard, "Number of stack traces dropped by query profiler or signal handler because pipe is full or cannot write to pipe.") \
M(QueryProfilerSignalOverruns, "Number of times we drop processing of a query profiler signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \
M(QueryProfilerConcurrencyOverruns, "Number of times we drop processing of a query profiler signal due to too many concurrent query profilers in other threads, which may indicate overload.") \
Expand Down Expand Up @@ -437,8 +435,6 @@ The server successfully detected this situation and will download merged part fr
M(ReadBufferFromS3ResetSessions, "Number of HTTP sessions that were reset in ReadBufferFromS3.") \
M(ReadBufferFromS3PreservedSessions, "Number of HTTP sessions that were preserved in ReadBufferFromS3.") \
\
M(ReadWriteBufferFromHTTPPreservedSessions, "Number of HTTP sessions that were preserved in ReadWriteBufferFromHTTP.") \
\
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \
Expand Down Expand Up @@ -727,6 +723,9 @@ The server successfully detected this situation and will download merged part fr
M(AddressesDiscovered, "Total count of new addresses in dns resolve results for http connections") \
M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for http connections") \
M(AddressesMarkedAsFailed, "Total count of addresses which has been marked as faulty due to connection errors for http connections") \
\
M(ReadWriteBufferFromHTTPRequestsSent, "Number of HTTP requests sent by ReadWriteBufferFromHTTP") \
M(ReadWriteBufferFromHTTPBytes, "Total size of payload bytes received and sent by ReadWriteBufferFromHTTP. Doesn't include HTTP headers.") \


#ifdef APPLY_FOR_EXTERNAL_EVENTS
Expand Down
3 changes: 0 additions & 3 deletions src/Coordination/KeeperConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@
M(PerfLocalMemoryReferences) \
M(PerfLocalMemoryMisses) \
\
M(CreatedHTTPConnections) \
M(CannotWriteToWriteBufferDiscard) \
\
M(S3ReadMicroseconds) \
Expand Down Expand Up @@ -180,8 +179,6 @@
M(ReadBufferFromS3RequestsErrors) \
M(ReadBufferFromS3ResetSessions) \
M(ReadBufferFromS3PreservedSessions) \
\
M(ReadWriteBufferFromHTTPPreservedSessions) \
\
M(WriteBufferFromS3Microseconds) \
M(WriteBufferFromS3Bytes) \
Expand Down
6 changes: 3 additions & 3 deletions src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
cache_key,
settings.remote_fs_cache,
FileCache::getCommonUser(),
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object_path); },
[=, this]() { return read_buffer_creator(/* restricted_seek */true, object); },
settings,
query_id,
object.bytes_size,
Expand All @@ -102,14 +102,14 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
/// former doesn't support seeks.
if (with_page_cache && !buf)
{
auto inner = read_buffer_creator(/* restricted_seek */false, object_path);
auto inner = read_buffer_creator(/* restricted_seek */false, object);
auto cache_key = FileChunkAddress { .path = cache_path_prefix + object_path };
buf = std::make_unique<CachedInMemoryReadBufferFromFile>(
cache_key, settings.page_cache, std::move(inner), settings);
}

if (!buf)
buf = read_buffer_creator(/* restricted_seek */true, object_path);
buf = read_buffer_creator(/* restricted_seek */true, object);

if (read_until_position > start_offset && read_until_position < start_offset + object.bytes_size)
buf->setReadUntilPosition(read_until_position - start_offset);
Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromRemoteFSGather.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ReadBufferFromRemoteFSGather final : public ReadBufferFromFileBase
friend class ReadIndirectBufferFromRemoteFS;

public:
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(bool restricted_seek, const std::string & path)>;
using ReadBufferCreator = std::function<std::unique_ptr<ReadBufferFromFileBase>(bool restricted_seek, const StoredObject & object)>;

ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
Expand Down
31 changes: 22 additions & 9 deletions src/Disks/IO/ReadBufferFromWebServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ namespace ErrorCodes
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_,
ContextPtr context_,
size_t file_size_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t read_until_position_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
, log(getLogger("ReadBufferFromWebServer"))
, context(context_)
, url(url_)
Expand All @@ -36,7 +37,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
}


std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
std::unique_ptr<SeekableReadBuffer> ReadBufferFromWebServer::initialize()
{
Poco::URI uri(url);
if (read_until_position)
Expand Down Expand Up @@ -119,9 +120,8 @@ bool ReadBufferFromWebServer::nextImpl()

auto result = impl->next();

BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());

chassert(working_buffer.begin() == impl->buffer().begin());
working_buffer = impl->buffer();
pos = impl->position();

if (result)
offset += working_buffer.size();
Expand All @@ -132,16 +132,29 @@ bool ReadBufferFromWebServer::nextImpl()

off_t ReadBufferFromWebServer::seek(off_t offset_, int whence)
{
if (impl)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer");

if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed");

if (offset_ < 0)
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_);

offset = offset_;
if (impl)
{
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
}

impl->seek(offset_, SEEK_SET);

working_buffer = impl->buffer();
pos = impl->position();
offset = offset_ + available();
}
else
{
offset = offset_;
}

return offset;
}
Expand Down
5 changes: 3 additions & 2 deletions src/Disks/IO/ReadBufferFromWebServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ReadBufferFromWebServer : public ReadBufferFromFileBase
explicit ReadBufferFromWebServer(
const String & url_,
ContextPtr context_,
size_t file_size_,
const ReadSettings & settings_ = {},
bool use_external_buffer_ = false,
size_t read_until_position = 0);
Expand All @@ -39,15 +40,15 @@ class ReadBufferFromWebServer : public ReadBufferFromFileBase
bool supportsRightBoundedReads() const override { return true; }

private:
std::unique_ptr<ReadBuffer> initialize();
std::unique_ptr<SeekableReadBuffer> initialize();

LoggerPtr log;
ContextPtr context;

const String url;
size_t buf_size;

std::unique_ptr<ReadBuffer> impl;
std::unique_ptr<SeekableReadBuffer> impl;

ReadSettings read_settings;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL

auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
path,
object_.remote_path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
Expand Down
3 changes: 2 additions & 1 deletion src/Disks/ObjectStorages/HDFS/HDFSObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(bool /* restricted_seek */, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
const auto & path = object_.remote_path;
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
auto hdfs_uri = path.substr(0, begin_of_path);
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (bool /* restricted_seek */, const std::string & file_path)
[=] (bool /* restricted_seek */, const StoredObject & object)
-> std::unique_ptr<ReadBufferFromFileBase>
{
return createReadBufferFromFileBase(file_path, modified_settings, read_hint, file_size);
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
};

switch (read_settings.remote_fs_method)
Expand Down
4 changes: 2 additions & 2 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT

auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
uri.bucket,
path,
object_.remote_path,
uri.version_id,
settings_ptr->request_settings,
disk_read_settings,
Expand Down
16 changes: 9 additions & 7 deletions src/Disks/ObjectStorages/Web/WebObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,17 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
std::optional<size_t>,
std::optional<size_t>) const
{
size_t object_size = object.bytes_size;
auto read_buffer_creator =
[this, read_settings]
(bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr<ReadBufferFromFileBase>
[this, read_settings, object_size]
(bool /* restricted_seek */, const StoredObject & object_) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / path_,
getContext(),
read_settings,
/* use_external_buffer */true);
return std::make_unique<ReadBufferFromWebServer>(
fs::path(url) / object_.remote_path,
getContext(),
object_size,
read_settings,
/* use_external_buffer */true);
};

auto global_context = Context::getGlobalContextInstance();
Expand Down
12 changes: 11 additions & 1 deletion src/IO/ReadWriteBufferFromHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
extern const Event ReadWriteBufferFromHTTPRequestsSent;
extern const Event ReadWriteBufferFromHTTPBytes;
}


Expand Down Expand Up @@ -245,6 +247,8 @@ ReadWriteBufferFromHTTP::CallResult ReadWriteBufferFromHTTP::callImpl(

auto session = makeHTTPSession(connection_group, uri_, timeouts, proxy_config);

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPRequestsSent);

auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
Expand Down Expand Up @@ -480,6 +484,8 @@ bool ReadWriteBufferFromHTTP::nextImpl()
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());

offset_from_begin_pos += working_buffer.size();

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, working_buffer.size());
},
/*on_retry=*/ [&] ()
{
Expand Down Expand Up @@ -528,6 +534,8 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co

copyFromIStreamWithProgressCallback(*result.response_stream, to, n, progress_callback, &bytes_copied, &is_canceled);

ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, bytes_copied);

offset += bytes_copied;
total_bytes_copied += bytes_copied;
to += bytes_copied;
Expand All @@ -536,6 +544,8 @@ size_t ReadWriteBufferFromHTTP::readBigAt(char * to, size_t n, size_t offset, co
},
/*on_retry=*/ [&] ()
{
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPBytes, bytes_copied);

offset += bytes_copied;
total_bytes_copied += bytes_copied;
to += bytes_copied;
Expand Down Expand Up @@ -574,7 +584,7 @@ off_t ReadWriteBufferFromHTTP::seek(off_t offset_, int whence)
if (impl)
{
auto position = getPosition();
if (offset_ > position)
if (offset_ >= position)
{
size_t diff = offset_ - position;
if (diff < read_settings.remote_read_min_bytes_for_seek)
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,12 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
auto context = getContext();
auto read_buffer_creator =
[this, read_settings, object_size]
(bool restricted_seek, const std::string & path) -> std::unique_ptr<ReadBufferFromFileBase>
(bool restricted_seek, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client,
bucket,
path,
object.remote_path,
version_id,
request_settings,
read_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@
</cached_web>
</policies>
</storage_configuration>

<query_log></query_log>
</clickhouse>
Loading

0 comments on commit 795026a

Please sign in to comment.