Skip to content

Commit

Permalink
Update RGW Arrow Flight code to adjust to API changes in Apache Arrow.
Browse files Browse the repository at this point in the history
Signed-off-by: J. Eric Ivancich <[email protected]>
  • Loading branch information
ivancich committed May 3, 2024
1 parent 70c28bd commit 095cf27
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
24 changes: 14 additions & 10 deletions src/rgw/rgw_flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include "arrow/type.h"
#include "arrow/buffer.h"
#include "arrow/util/string_view.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -175,7 +174,7 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
previous_key(null_flight_key)
{ }

arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
arrow::Result<std::unique_ptr<flt::FlightInfo>> Next() override {
std::optional<FlightData> fd = flight_store->after_key(previous_key);
if (fd) {
previous_key = fd->key;
Expand All @@ -188,11 +187,9 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,

ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
*info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
return arw::Status::OK();
return std::make_unique<flt::FlightInfo>(std::move(info_obj));
} else {
*info = nullptr;
return arw::Status::OK();
return nullptr;
}
}
}; // class RGWFlightListing
Expand Down Expand Up @@ -346,7 +343,7 @@ class LocalInputStream : public arw::io::InputStream {
}
}

arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "called, not implemented" << dendl;
return arw::Status::NotImplemented("peek not currently allowed");
}
Expand Down Expand Up @@ -458,7 +455,7 @@ class LocalRandomAccessFile : public arw::io::RandomAccessFile {
return flight_data.obj_size;
}

arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
std::iostream::pos_type here = file.tellg();
if (here == -1) {
return arw::Status::IOError(
Expand Down Expand Up @@ -620,7 +617,7 @@ class RandomAccessObject : public arw::io::RandomAccessFile {
return flight_data.obj_size;
}

arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "entered: " << nbytes << " bytes" << dendl;

int64_t saved_position = position;
Expand Down Expand Up @@ -716,7 +713,14 @@ arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,

std::vector<std::shared_ptr<arw::RecordBatch>> batches;
arw::TableBatchReader batch_reader(*table);
ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
while (true) {
std::shared_ptr<arw::RecordBatch> p;
auto s = batch_reader.ReadNext(&p);
if (!s.ok()) {
break;
}
batches.push_back(p);
}

ARROW_ASSIGN_OR_RAISE(auto owning_reader,
arw::RecordBatchReader::Make(
Expand Down
12 changes: 9 additions & 3 deletions src/rgw/rgw_flight.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "rgw_frontend.h"
#include "arrow/type.h"
#include "arrow/flight/server.h"
#include "arrow/util/string_view.h"

#include "rgw_flight_frontend.h"

Expand Down Expand Up @@ -122,6 +121,7 @@ class FlightServer : public flt::FlightServerBase {
FlightStore* flight_store;

std::map<std::string, Data1> data;
arw::Status serve_return_value;

public:

Expand All @@ -132,6 +132,12 @@ class FlightServer : public flt::FlightServerBase {
const DoutPrefix& dp);
~FlightServer() override;

// provides a version of Serve that has no return value, to avoid
// warnings when launching in a thread
void ServeAlt() {
serve_return_value = Serve();
}

FlightStore* get_flight_store() {
return flight_store;
}
Expand All @@ -153,14 +159,14 @@ class FlightServer : public flt::FlightServerBase {
std::unique_ptr<flt::FlightDataStream> *stream) override;
}; // class FlightServer

class OwningStringView : public arw::util::string_view {
class OwningStringView : public std::string_view {

uint8_t* buffer;
int64_t capacity;
int64_t consumed;

OwningStringView(uint8_t* _buffer, int64_t _size) :
arw::util::string_view((const char*) _buffer, _size),
std::string_view((const char*) _buffer, _size),
buffer(_buffer),
capacity(_size),
consumed(_size)
Expand Down
29 changes: 20 additions & 9 deletions src/rgw/rgw_flight_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ int FlightFrontend::init() {
}
const std::string url =
std::string("grpc+tcp://localhost:") + std::to_string(port);
flt::Location location;
arw::Status s = flt::Location::Parse(url, &location);
if (!s.ok()) {
ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
auto r = flt::Location::Parse(url);
if (!r.ok()) {
ERROR << "could not parse server uri: " << url << dendl;
return -EINVAL;
}
flt::Location location = *r;

flt::FlightServerOptions options(location);
options.verify_client = false;
s = env.flight_server->Init(options);
auto s = env.flight_server->Init(options);
if (!s.ok()) {
ERROR << "couldn't init flight server; status=" << s << dendl;
return -EINVAL;
Expand All @@ -85,7 +85,7 @@ int FlightFrontend::init() {
int FlightFrontend::run() {
try {
flight_thread = make_named_thread(server_thread_name,
&FlightServer::Serve,
&FlightServer::ServeAlt,
env.flight_server);

INFO << "FlightServer thread started, id=" <<
Expand All @@ -99,8 +99,19 @@ int FlightFrontend::run() {
}

void FlightFrontend::stop() {
env.flight_server->Shutdown();
env.flight_server->Wait();
arw::Status s;
s = env.flight_server->Shutdown();
if (!s.ok()) {
ERROR << "call to Shutdown failed; status=" << s << dendl;
return;
}

s = env.flight_server->Wait();
if (!s.ok()) {
ERROR << "call to Wait failed; status=" << s << dendl;
return;
}

INFO << "FlightServer shut down" << dendl;
}

Expand Down Expand Up @@ -186,7 +197,7 @@ int FlightGetObj_Filter::handle_data(bufferlist& bl,
arrow::io::ReadableFile::Open(temp_file_name));
const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);

file->Close();
ARROW_RETURN_NOT_OK(file->Close());

num_rows = metadata->num_rows();
kv_metadata = metadata->key_value_metadata();
Expand Down

0 comments on commit 095cf27

Please sign in to comment.