Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
HAWQ-1811. Sync with OushuDB - Phase III
Browse files Browse the repository at this point in the history
  • Loading branch information
ztao1987 committed Dec 27, 2021
1 parent 4f67d22 commit d01b19b
Show file tree
Hide file tree
Showing 65 changed files with 1,968 additions and 1,124 deletions.
111 changes: 2 additions & 109 deletions contrib/extfmtcsv/extfmtcsv.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,115 +484,8 @@ void buildFormatterOptionsInJson(PG_FUNCTION_ARGS, char **jsonStr)
}

/* add default settings for this formatter */
if (json_object_object_get(optJsonObject, "delimiter") == NULL)
{
json_object_object_add(optJsonObject, "delimiter",
json_object_new_string(
(externalFmtType == TextFormatTypeTXT) ? "\t" : ","));
}

if (json_object_object_get(optJsonObject, "null") == NULL)
{
json_object_object_add(optJsonObject, "null",
json_object_new_string(
(externalFmtType == TextFormatTypeTXT) ? "\\N" : ""));
}

if (json_object_object_get(optJsonObject, "fill_missing_fields") == NULL)
{
json_object_object_add(optJsonObject, "fill_missing_fields",
json_object_new_boolean(0));
}
else
{
json_object_object_del(optJsonObject, "fill_missing_fields");
json_object_object_add(optJsonObject, "fill_missing_fields",
json_object_new_boolean(1));
}

if (json_object_object_get(optJsonObject, "header") == NULL)
{
json_object_object_add(optJsonObject, "header",
json_object_new_boolean(0));
}
else
{
json_object_object_del(optJsonObject, "header");
json_object_object_add(optJsonObject, "header",
json_object_new_boolean(1));
}

if (json_object_object_get(optJsonObject, "reject_limit") == NULL)
{
json_object_object_add(optJsonObject, "reject_limit",
json_object_new_int(0));
}

if (json_object_object_get(optJsonObject, "err_table") == NULL)
{
json_object_object_add(optJsonObject, "err_table",
json_object_new_string(""));
}

if (json_object_object_get(optJsonObject, "newline") == NULL)
{
json_object_object_add(optJsonObject, "newline",
json_object_new_string("lf"));
}

if (json_object_object_get(optJsonObject, "encoding") == NULL)
{
const char *encodingStr = pg_encoding_to_char(
((FormatterData*) fcinfo->context)->fmt_external_encoding);
char lowerCaseEncodingStr[64];
strcpy(lowerCaseEncodingStr, encodingStr);
for (char *p = lowerCaseEncodingStr; *p != '\0'; ++p)
{
*p = tolower(*p);
}

json_object_object_add(optJsonObject, "encoding",
json_object_new_string(lowerCaseEncodingStr));
}

if (externalFmtType == TextFormatTypeCSV
&& json_object_object_get(optJsonObject, "quote") == NULL)
{
json_object_object_add(optJsonObject, "quote",
json_object_new_string("\""));
}

if (json_object_object_get(optJsonObject, "escape") == NULL)
{
if (externalFmtType == TextFormatTypeCSV)
{
/* Let escape follow quote's setting */
struct json_object *val = json_object_object_get(optJsonObject,
"quote");
json_object_object_add(optJsonObject, "escape",
json_object_new_string(json_object_get_string(val)));
}
else
{
json_object_object_add(optJsonObject, "escape",
json_object_new_string("\\"));
}
}

if (json_object_object_get(optJsonObject, "force_quote") == NULL)
{
json_object_object_add(optJsonObject, "force_quote",
json_object_new_string(""));
}

/* This is for csv formatter only */
if (externalFmtType == TextFormatTypeCSV
&& json_object_object_get(optJsonObject, "force_notnull") == NULL)
{
json_object_object_add(optJsonObject, "force_notnull",
json_object_new_string(""));
}

int encoding = ((FormatterData *)fcinfo->context)->fmt_external_encoding;
buildDefaultFormatterOptionsInJson(encoding, externalFmtType, optJsonObject);
*jsonStr = NULL;
if (optJsonObject != NULL)
{
Expand Down
10 changes: 10 additions & 0 deletions contrib/exthdfs/exthdfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,16 @@ Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("hdfsprotocol_validate : " "'force_quote' option is only available in 'csv' formatter")));
}
}

if (strcasecmp(de->defname, "header") == 0)
{
/* this is allowed only for readable table */
if (pvalidator_data->direction != EXT_VALIDATE_READ)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("hdfsprotocol_validate : " "'header' option is only available in readable external table")));
}
}
}

/* All urls should
Expand Down
8 changes: 8 additions & 0 deletions contrib/hornet/hornet.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "storage/fd.h"
#include "storage/filesystem.h"
#include "utils/builtins.h"
#include "utils/hawq_funcoid_mapping.h"

Datum ls_hdfs_dir(PG_FUNCTION_ARGS);

Expand Down Expand Up @@ -131,3 +132,10 @@ Datum ls_hdfs_dir(PG_FUNCTION_ARGS) {
SRF_RETURN_DONE(funcctx);
}
}

PG_FUNCTION_INFO_V1(is_supported_proc_in_NewQE);
Datum is_supported_proc_in_NewQE(PG_FUNCTION_ARGS) {
Oid a = PG_GETARG_OID(0);
int32_t mappingFuncId = HAWQ_FUNCOID_MAPPING(a);
PG_RETURN_BOOL(!(IS_HAWQ_MAPPING_FUNCID_INVALID(mappingFuncId)));
}
5 changes: 5 additions & 0 deletions contrib/hornet/load_hornet_helper_function.sql
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,8 @@ BEGIN
return contents_command;
END;
$$ LANGUAGE PLPGSQL;


drop function if exists is_supported_proc_in_NewQE(oid);

create function is_supported_proc_in_NewQE(oid) returns boolean as '$libdir/hornet','is_supported_proc_in_NewQE'language c immutable;
5 changes: 4 additions & 1 deletion contrib/orc/orc.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ Datum orc_validate_encodings(PG_FUNCTION_ARGS)
if (strncasecmp(encoding_name, "utf8", strlen("utf8")))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding for ORC external table. ", encoding_name), errOmitLocation(true)));
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding for ORC external table. Encoding for ORC external table must be UTF8.", encoding_name), errOmitLocation(true)));
}

PG_RETURN_VOID() ;
Expand Down Expand Up @@ -358,6 +358,9 @@ Datum orc_validate_datatypes(PG_FUNCTION_ARGS) {
int4 tmp_typmod = typmod - VARHDRSZ;
int precision = (tmp_typmod >> 16) & 0xffff;
int scale = tmp_typmod & 0xffff;

if (typmod == -1 && strcasecmp(orc_enable_no_limit_numeric, "ON") == 0) continue; // for numeric without precision and scale.

if (precision < 1 || 38 < precision)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down
28 changes: 28 additions & 0 deletions src/backend/access/common/reloptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "postgres.h"

#include "access/reloptions.h"
#include "access/orcam.h"
#include "catalog/pg_type.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbparquetstoragewrite.h"
Expand Down Expand Up @@ -321,13 +322,15 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
"bucketnum",
"dicthreshold",
"bloomfilter",
"stripesize",
};

char *values[ARRAY_SIZE(default_keywords)];
int32 fillfactor = defaultFillfactor;
int32 blocksize = DEFAULT_APPENDONLY_BLOCK_SIZE;
int32 pagesize = DEFAULT_PARQUET_PAGE_SIZE;
int32 rowgroupsize = DEFAULT_PARQUET_ROWGROUP_SIZE;
int32 stripesize = DEFAULT_ORC_STRIPE_SIZE;
bool appendonly = false;
bool checksum = false;
char* compresstype = NULL;
Expand Down Expand Up @@ -808,6 +811,30 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
errOmitLocation(true)));
}

/* stripesize */
if (values[13] != NULL)
{
if(!(columnstore == RELSTORAGE_ORC)){
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid option \'stripesize\' for non-orc table"),
errOmitLocation(true)));
}

stripesize = pg_atoi(values[13], sizeof(int32), 0);

if ((stripesize < MIN_ORC_STRIPE_SIZE) || (stripesize > MAX_ORC_STRIPE_SIZE))
{
if (validate)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("stripe size for orc table should between 1MB and 1GB and should be specified in MBytes. "
"Got %d MB", stripesize), errOmitLocation(true)));

stripesize = DEFAULT_ORC_STRIPE_SIZE;
}
}

// dicthreshold
if (values[11] != NULL) {
if(!(columnstore == RELSTORAGE_ORC)){
Expand Down Expand Up @@ -861,6 +888,7 @@ default_reloptions(Datum reloptions, bool validate, char relkind,
result->blocksize = blocksize;
result->pagesize = pagesize;
result->rowgroupsize = rowgroupsize;
result->stripesize = stripesize;
result->compresslevel = compresslevel;
if (compresstype != NULL)
for (j = 0;j < strlen(compresstype); j++)
Expand Down
45 changes: 27 additions & 18 deletions src/backend/access/external/url_curl.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,17 +643,26 @@ static int check_response(URL_FILE *file, int *rc, char **response_string) {
snprintf(connmsg, sizeof connmsg, "error code = %d (%s)",
(int)oserrno, strerror((int)oserrno));
}

ereport(
ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection with gpfdist failed for \"%s\", effective url: "
"\"%s\". %s",
file->url, effective_url, (oserrno != 0 ? connmsg : ""))));
// When still_running == 0 and os level err = "time out", we will not
// report error.
if (!(file->u.curl.still_running == 0 && oserrno == 110)) {
ereport(
ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection with gpfdist failed for \"%s\", effective url: "
"\"%s\". %s",
file->url, effective_url, (oserrno != 0 ? connmsg : ""))));
}
} else if (response_code ==
FDIST_TIMEOUT) // gpfdist server return timeout code
{
return FDIST_TIMEOUT;
// When still_running == 0 and gpfdist return err = "time out", we will
// not report error.
if (file->u.curl.still_running == 0) {
return 0;
} else {
return FDIST_TIMEOUT;
}
} else {
/* we need to sleep 1 sec to avoid this condition:
1- seg X gets an error message from gpfdist
Expand Down Expand Up @@ -1228,7 +1237,6 @@ URL_FILE *url_curl_fopen(char *url, bool forwrite, extvar_t *ev,
elog(ERROR, "internal error: curl_multi_add_handle failed (%d - %s)", e,
curl_easy_strerror(e));
}

while (CURLM_CALL_MULTI_PERFORM ==
(e = curl_multi_perform(multi_handle, &file->u.curl.still_running)))
;
Expand All @@ -1241,15 +1249,16 @@ URL_FILE *url_curl_fopen(char *url, bool forwrite, extvar_t *ev,
fill_buffer(file, 1);

/* check the connection for GET request */
// if connection is established, http_response should not be null
if (file->u.curl.still_running > 0 || file->u.curl.http_response == 0) {
if (check_response(file, &response_code, &response_string))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not open \"%s\" for reading", file->url),
errdetail("Unexpected response from gpfdist server: %d - %s",
response_code, response_string)));
}
// When other vseg has read all data and this vseg attend to read 1 byte to
// check connection, it may get error "timed out".
// If error is not "timed out", we will still report error.
if (check_response(file, &response_code, &response_string))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not open \"%s\" for reading", file->url),
errdetail("Unexpected response from gpfdist server: %d - %s",
response_code, response_string)));

if (file->u.curl.still_running == 0) {
elog(LOG,
"session closed when checking the connection in url_curl_fopen, "
Expand Down
46 changes: 46 additions & 0 deletions src/backend/access/orc/orcam.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ OrcInsertDescData *orcBeginInsert(Relation rel,
appendStringInfo(&option, "\"logicEof\": %" PRId64, segfileinfo->eof[0]);
appendStringInfo(&option, ", \"uncompressedEof\": %" PRId64,
segfileinfo->uncompressed_eof[0]);
appendStringInfo(
&option, ", \"stripeSize\": %" PRId64,
((StdRdOptions *)(rel->rd_options))->stripesize * 1024 * 1024);
if (aoentry->compresstype)
appendStringInfo(&option, ", %s", aoentry->compresstype);
appendStringInfoChar(&option, '}');
Expand Down Expand Up @@ -911,6 +914,49 @@ uint64 orcEndUpdate(OrcUpdateDescData *updateDesc) {
return callback.processedTupleCount;
}

int64_t *orcCreateIndex(Relation rel, int idxId, List *segno, int64 *eof,
List *columnsToRead, int sortIdx) {
checkOushuDbExtensiveFeatureSupport("ORC INDEX");
OrcScanDescData *scanDesc = palloc0(sizeof(OrcScanDescData));
OrcFormatData *orcFormatData = scanDesc->orcFormatData =
palloc0(sizeof(OrcFormatData));

RelationIncrementReferenceCount(rel);

TupleDesc desc = RelationGetDescr(rel);

scanDesc->rel = rel;
orcFormatData->fmt = ORCFormatNewORCFormatC("{}", 0);
initOrcFormatUserData(desc, orcFormatData);

int32 splitCount = list_length(segno);
int *columnsToReadList =
palloc0(sizeof(int) * orcFormatData->numberOfColumns);
for (int i = 0; i < list_length(columnsToRead); i++) {
columnsToReadList[list_nth_int(columnsToRead, i) - 1] = 1;
}
int *sortIdxList = palloc0(sizeof(int) * orcFormatData->numberOfColumns);
for (int i = 0; i < sortIdx; i++) {
sortIdxList[i] = list_nth_int(columnsToRead, i) - 1;
}

ORCFormatFileSplit *splits = palloc0(sizeof(ORCFormatFileSplit) * splitCount);
int32 filePathMaxLen = AOSegmentFilePathNameLen(rel) + 1;
for (int32 i = 0; i < splitCount; ++i) {
splits[i].fileName = palloc0(filePathMaxLen);
MakeAOSegmentFileName(rel, list_nth_int(segno, i), -1, dummyPlaceholder,
splits[i].fileName);
}

if (splitCount > 0) addFilesystemCredential(splits[0].fileName);
RelationDecrementReferenceCount(rel);
return ORCFormatCreateIndex(
idxId, splits, splitCount, eof, columnsToReadList, sortIdxList, sortIdx,
orcFormatData->colNames, orcFormatData->colDatatypes,
orcFormatData->colDatatypeMods, orcFormatData->numberOfColumns,
gp_session_id, rm_seg_tmp_dirs);
}

bool isDirectDispatch(Plan *plan) {
return plan->directDispatch.isDirectDispatch;
}
Loading

0 comments on commit d01b19b

Please sign in to comment.