Skip to content

Commit

Permalink
upon trino pushdown s3select requests, some of the expressions may fa…
Browse files Browse the repository at this point in the history
…iled the s3select-parser due to trino expresion conversion, (#128)

such as qoute presentation, <> operator, redundant characters at the end-of-statement(white-space, semicolon).
Trino rejects RGW response with HIVE_CURSOR_ERROR due to its size, thus, s3select-engine will use "paging" system (instead of sending in a big chunks)

Signed-off-by: galsalomon66 <[email protected]>
  • Loading branch information
galsalomon66 authored Mar 2, 2023
1 parent 2e96e6f commit 874752f
Showing 1 changed file with 59 additions and 25 deletions.
84 changes: 59 additions & 25 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ struct s3select : public bsc::grammar<s3select>
{
///// s3select syntax rules and actions for building AST

select_expr = (select_expr_base_ >> ';') | select_expr_base_;
select_expr = select_expr_base_ >> bsc::lexeme_d[ *(bsc::str_p(" ")|bsc::str_p(";")) ];

select_expr_base_ = select_expr_base >> S3SELECT_KW("limit") >> (limit_number)[BOOST_BIND_ACTION(push_limit_clause)] | select_expr_base;

Expand Down Expand Up @@ -818,7 +818,7 @@ struct s3select : public bsc::grammar<s3select>

addsubop_operator = bsc::str_p("+") | bsc::str_p("-");

arith_cmp = bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("=") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");
arith_cmp = bsc::str_p("<>") | bsc::str_p(">=") | bsc::str_p("<=") | bsc::str_p("=") | bsc::str_p("<") | bsc::str_p(">") | bsc::str_p("!=");

and_op = S3SELECT_KW("and");

Expand Down Expand Up @@ -1221,7 +1221,7 @@ void push_compare_operator::builder(s3select* self, const char* a, const char* b
{
c = arithmetic_operand::cmp_t::EQ;
}
else if (token == "!=")
else if (token == "!=" || token == "<>")
{
c = arithmetic_operand::cmp_t::NE;
}
Expand Down Expand Up @@ -2043,6 +2043,7 @@ class base_s3object
bool m_is_limit_on;
unsigned long m_limit;
unsigned long m_processed_rows;
size_t m_returned_bytes_size;
std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system

public:
Expand Down Expand Up @@ -2103,7 +2104,7 @@ class base_s3object
m_processed_rows = 0;
}

base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_sql_processing_status(Status::INITIAL_STAT){}
base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){}

explicit base_s3object(s3select* m):base_s3object()
{
Expand All @@ -2126,6 +2127,11 @@ class base_s3object
fp_ext_debug_mesg = fp_external;
}

size_t get_return_result_size()
{
return m_returned_bytes_size;
}

void result_values_to_string(multi_values& projections_resuls, std::string& result)
{
size_t i = 0;
Expand All @@ -2134,30 +2140,36 @@ class base_s3object

for(auto& res : projections_resuls.values)
{
if(fp_ext_debug_mesg)
if(fp_ext_debug_mesg)
fp_ext_debug_mesg( res->to_string() );

if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
quoted_result << std::quoted(res->to_string(),m_csv_defintion.output_quot_char, m_csv_defintion.escape_char);
result.append(quoted_result.str());
m_returned_bytes_size += quoted_result.str().size();
}//TODO to add asneeded
else
{
result.append(res->to_string());
m_returned_bytes_size += strlen(res->to_string());
}

if(!m_csv_defintion.redundant_column) {
if(++i < projections_resuls.values.size()) {
result.append(output_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}
else {
result.append(output_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}
if(!m_aggr_flow)
result.append(output_row_delimiter);
if(!m_aggr_flow){
result.append(output_row_delimiter);
m_returned_bytes_size += output_delimiter.size();
}
}

Status getMatchRow( std::string& result)
Expand Down Expand Up @@ -2197,7 +2209,6 @@ class base_s3object
throw base_s3select_exception("on aggregation query , can not stream row data post do-aggregate call", base_s3select_exception::s3select_exp_en_t::FATAL);
}


for (auto& a : *m_s3_select->get_aliases()->get())
{
a.second->invalidate_cache_result();
Expand All @@ -2208,33 +2219,30 @@ class base_s3object
{
columnar_fetch_projection();
for (auto i : m_projections)
{
i->eval();
{
i->eval();
}
}

if(m_is_limit_on && m_processed_rows == m_limit)
{
for (auto& i : m_projections)
{
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable

projections_resuls.push_value( &(i->eval()) );
}

for (auto& i : m_projections)
{
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
projections_resuls.push_value( &(i->eval()) );
}
result_values_to_string(projections_resuls,result);
return m_sql_processing_status = Status::LIMIT_REACHED;
return m_sql_processing_status = Status::LIMIT_REACHED;
}

}
while (multiple_row_processing());
}
else
{
do
{
row_fetch_data();
row_fetch_data();
columnar_fetch_where_clause_columns();
if(is_end_of_stream())
{
Expand All @@ -2259,7 +2267,7 @@ class base_s3object

if(!multiple_row_processing())
{
found = !m_where_clause || m_where_clause->eval().is_true();
found = !m_where_clause || m_where_clause->eval().is_true();
}

if(found)
Expand All @@ -2274,7 +2282,6 @@ class base_s3object
}

}

return is_end_of_stream() ? (m_sql_processing_status = Status::END_OF_STREAM) : (m_sql_processing_status = Status::NORMAL_EXIT);

}//getMatchRow
Expand All @@ -2283,7 +2290,8 @@ class base_s3object

}; //base_s3object


//TODO config / default-value
#define CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT (4 * 1024)
class csv_object : public base_s3object
{

Expand Down Expand Up @@ -2348,6 +2356,16 @@ class csv_object : public base_s3object
size_t m_processed_bytes;
int64_t m_number_of_tokens;

std::function<int(std::string&)> fp_s3select_result_format=nullptr;
std::function<int(std::string&)> fp_s3select_header_format=nullptr;
public:
void set_result_formatters( std::function<int(std::string&)>& result_format,
std::function<int(std::string&)>& header_format)
{
fp_s3select_result_format = result_format;
fp_s3select_header_format = header_format;
}
private:
int getNextRow()
{
size_t num_of_tokens=0;
Expand Down Expand Up @@ -2375,7 +2393,6 @@ class csv_object : public base_s3object
virtual ~csv_object() = default;

public:

virtual bool is_end_of_stream()
{
return m_number_of_tokens < 0;
Expand Down Expand Up @@ -2534,6 +2551,7 @@ class csv_object : public base_s3object
m_csv_defintion.comment_chars,
m_csv_defintion.trim_chars);


if(m_extract_csv_header_info == false)
{
extract_csv_header_info();
Expand Down Expand Up @@ -2561,6 +2579,15 @@ class csv_object : public base_s3object
}
}

if(fp_s3select_result_format && fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
}
}

if (m_sql_processing_status == Status::END_OF_STREAM)
{
break;
Expand All @@ -2572,6 +2599,13 @@ class csv_object : public base_s3object

} while (true);

if(fp_s3select_result_format && fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
}

return 0;
}
};
Expand Down

0 comments on commit 874752f

Please sign in to comment.