Skip to content

Commit

Permalink
changes related to new version integration with RGW (#127)
Browse files Browse the repository at this point in the history
*  refactor for base_s3object and its descendants, setting should happen once per query life-cycle

* integrate the limit-operator into JSON processing(stop reading object)

* valgrind issues: a fix for invalid-read(array out of range). fix memory-leak, a missing call to destructor.

* a fix for Parquet flow upon the limit operator. a fix for Parquet object construction. adding debug functionality

* add error handling(exception)

* upon JSON flow the sql_result variable is set externally, it is done to avoid redundant copies. the RGW-response-system needs to use it own sql-result variable

* result should be cleared before calling to JSON processing

Signed-off-by: galsalomon66 <[email protected]>
  • Loading branch information
galsalomon66 authored Jan 24, 2023
1 parent 26f5f5d commit 2e96e6f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 77 deletions.
43 changes: 41 additions & 2 deletions example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,13 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)

std::function<int(std::string&)> fp_s3select_result_format = [](std::string& result){std::cout << result;result.clear();return 0;};
std::function<int(std::string&)> fp_s3select_header_format = [](std::string& result){result="";return 0;};
std::function<void(const char*)> fp_debug = [](const char* msg)
{
std::cout << "DEBUG: {" << msg << "}" << std::endl;
};

parquet_object parquet_processor(input_file,&s3select_syntax,&rgw);
//parquet_processor.set_external_debug_system(fp_debug);

std::string result;

Expand Down Expand Up @@ -385,18 +390,45 @@ int process_json_query(const char* input_query,const char* fname)
while(read_sz)
{
std::cout << "read next chunk " << read_sz << std::endl;
result.clear();

try{
status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz);
} catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
{
return -1;
}
}

status = json_query_processor.run_s3select_on_stream(result, buff.data(), read_sz, object_sz);
std::cout << result << std::endl;

if(status<0)
{
std::cout << "failure upon processing " << std::endl;
return -1;
}
if(json_query_processor.is_sql_limit_reached())
{
std::cout << "json processing reached limit " << std::endl;
break;
}
read_sz = input_file_stream.readsome(buff.data(),BUFFER_SIZE);
}
json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz);
try{
result.clear();
json_query_processor.run_s3select_on_stream(result, 0, 0, object_sz);
} catch (base_s3select_exception &e)
{
std::cout << e.what() << std::endl;
if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL) //abort query execution
{
return -1;
}
}

std::cout << result << std::endl;

return 0;
Expand Down Expand Up @@ -486,6 +518,13 @@ int run_on_localFile(char* input_query)

s3selectEngine::csv_object s3_csv_object(&s3select_syntax, csv);

std::function<void(const char*)> fp_debug = [](const char* msg)
{
std::cout << "DEBUG" << msg << std::endl;
};

//s3_csv_object.set_external_debug_system(fp_debug);

#define BUFF_SIZE (1024*1024*4) //simulate 4mb parts in s3 object
char* buff = (char*)malloc( BUFF_SIZE );
while(1)
Expand Down
Loading

0 comments on commit 2e96e6f

Please sign in to comment.