Skip to content

Commit

Permalink
Allow distributed execution from run_command_on_* functions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoslot committed May 20, 2022
1 parent b1d1df8 commit ad5214b
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 138 deletions.
34 changes: 28 additions & 6 deletions src/backend/distributed/commands/citus_global_signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "distributed/backend_data.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "lib/stringinfo.h"
#include "signal.h"
Expand Down Expand Up @@ -111,18 +112,39 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
#endif
}

StringInfo queryResult = makeStringInfo();
int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort);

bool reportResultError = true;
if (!SendRemoteCommand(connection, cancelQuery->data))
{
/* if we cannot connect, we warn and report false */
ReportConnectionError(connection, WARNING);
return false;
}

bool success = ExecuteRemoteQueryOrCommand(workerNode->workerName,
workerNode->workerPort, cancelQuery->data,
queryResult, reportResultError);
bool raiseInterrupts = true;
PGresult *queryResult = GetRemoteCommandResult(connection, raiseInterrupts);

if (success && queryResult && strcmp(queryResult->data, "f") == 0)
/* if remote node throws an error, we also throw an error */
if (!IsResponseOK(queryResult))
{
ReportResultError(connection, queryResult, ERROR);
}

StringInfo queryResultString = makeStringInfo();
bool success = EvaluateSingleQueryResult(connection, queryResult, queryResultString);
if (success && strcmp(queryResultString->data, "f") == 0)
{
/* worker node returned "f" */
success = false;
}

PQclear(queryResult);

bool raiseErrors = false;
ClearResults(connection, raiseErrors);

return success;
}
12 changes: 12 additions & 0 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,18 @@ IsRebalancerInternalBackend(void)
}


/*
* IsCitusRunCommandBackend returns true if we are in a backend that one of
* the run_command_on_* functions initiated.
*/
bool
IsCitusRunCommandBackend(void)
{
return application_name &&
strcmp(application_name, CITUS_RUN_COMMAND_APPLICATION_NAME) == 0;
}


/*
* IsCitusInitiatedRemoteBackend returns true if we are in a backend that citus
* initiated via remote connection.
Expand Down
89 changes: 89 additions & 0 deletions src/backend/distributed/connection/remote_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -1115,3 +1115,92 @@ SendCancelationRequest(MultiConnection *connection)

return cancelSent;
}


/*
* EvaluateSingleQueryResult gets the query result from connection and returns
* true if the query is executed successfully, false otherwise. A query result
* or an error message is returned in queryResultString. The function requires
* that the query returns a single column/single row result. It returns an
* error otherwise.
*/
bool
EvaluateSingleQueryResult(MultiConnection *connection, PGresult *queryResult,
StringInfo queryResultString)
{
bool success = false;

ExecStatusType resultStatus = PQresultStatus(queryResult);
if (resultStatus == PGRES_COMMAND_OK)
{
char *commandStatus = PQcmdStatus(queryResult);
appendStringInfo(queryResultString, "%s", commandStatus);
success = true;
}
else if (resultStatus == PGRES_TUPLES_OK)
{
int ntuples = PQntuples(queryResult);
int nfields = PQnfields(queryResult);

/* error if query returns more than 1 rows, or more than 1 fields */
if (nfields != 1)
{
appendStringInfo(queryResultString,
"expected a single column in query target");
}
else if (ntuples > 1)
{
appendStringInfo(queryResultString,
"expected a single row in query result");
}
else
{
int row = 0;
int column = 0;
if (!PQgetisnull(queryResult, row, column))
{
char *queryResultValue = PQgetvalue(queryResult, row, column);
appendStringInfo(queryResultString, "%s", queryResultValue);
}
success = true;
}
}
else
{
StoreErrorMessage(connection, queryResultString);
}

return success;
}


/*
* StoreErrorMessage gets the error message from connection and stores it
* in queryResultString. It should be called only when error is present
* otherwise it would return a default error message.
*/
void
StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
{
char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL)
{
/* copy the error message to a writable memory */
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));

char *firstNewlineIndex = strchr(errorMessage, '\n');

/* trim the error message at the line break */
if (firstNewlineIndex != NULL)
{
*firstNewlineIndex = '\0';
}
}
else
{
/* put a default error message if no error message is reported */
errorMessage = "An error occurred while running the query";
}

appendStringInfo(queryResultString, "%s", errorMessage);
}
Loading

0 comments on commit ad5214b

Please sign in to comment.