Skip to content

Commit 21734d2

Browse files
committed
Support writable foreign tables.
This patch adds the core-system infrastructure needed to support updates on foreign tables, and extends contrib/postgres_fdw to allow updates against remote Postgres servers. There's still a great deal of room for improvement in optimization of remote updates, but at least there's basic functionality there now. KaiGai Kohei, reviewed by Alexander Korotkov and Laurenz Albe, and rather heavily revised by Tom Lane.
1 parent 7f49a67 commit 21734d2

29 files changed

+3671
-346
lines changed

contrib/file_fdw/input/file_fdw.source

-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ SELECT tableoid::regclass, b FROM agg_csv;
118118
INSERT INTO agg_csv VALUES(1,2.0);
119119
UPDATE agg_csv SET a = 1;
120120
DELETE FROM agg_csv WHERE a = 100;
121-
SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
122121
-- but this should be ignored
123122
SELECT * FROM agg_csv FOR UPDATE;
124123

contrib/file_fdw/output/file_fdw.source

+3-7
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,11 @@ SELECT tableoid::regclass, b FROM agg_csv;
185185

186186
-- updates aren't supported
187187
INSERT INTO agg_csv VALUES(1,2.0);
188-
ERROR: cannot change foreign table "agg_csv"
188+
ERROR: cannot insert into foreign table "agg_csv"
189189
UPDATE agg_csv SET a = 1;
190-
ERROR: cannot change foreign table "agg_csv"
190+
ERROR: cannot update foreign table "agg_csv"
191191
DELETE FROM agg_csv WHERE a = 100;
192-
ERROR: cannot change foreign table "agg_csv"
193-
SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
194-
ERROR: row-level locks cannot be used with foreign table "agg_csv"
195-
LINE 1: SELECT * FROM agg_csv FOR UPDATE OF agg_csv;
196-
^
192+
ERROR: cannot delete from foreign table "agg_csv"
197193
-- but this should be ignored
198194
SELECT * FROM agg_csv FOR UPDATE;
199195
a | b

contrib/postgres_fdw/connection.c

+65-2
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,18 @@ typedef struct ConnCacheEntry
4747
PGconn *conn; /* connection to foreign server, or NULL */
4848
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
4949
* one level of subxact open, etc */
50+
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
51+
bool have_error; /* have any subxacts aborted in this xact? */
5052
} ConnCacheEntry;
5153

5254
/*
5355
* Connection cache (initialized on first use)
5456
*/
5557
static HTAB *ConnectionHash = NULL;
5658

57-
/* for assigning cursor numbers */
59+
/* for assigning cursor numbers and prepared statement numbers */
5860
static unsigned int cursor_number = 0;
61+
static unsigned int prep_stmt_number = 0;
5962

6063
/* tracks whether any work is needed in callback functions */
6164
static bool xact_got_connection = false;
@@ -78,6 +81,10 @@ static void pgfdw_subxact_callback(SubXactEvent event,
7881
* if we don't already have a suitable one, and a transaction is opened at
7982
* the right subtransaction nesting depth if we didn't do that already.
8083
*
84+
* will_prep_stmt must be true if caller intends to create any prepared
85+
* statements. Since those don't go away automatically at transaction end
86+
* (not even on error), we need this flag to cue manual cleanup.
87+
*
8188
* XXX Note that caching connections theoretically requires a mechanism to
8289
* detect change of FDW objects to invalidate already established connections.
8390
* We could manage that by watching for invalidation events on the relevant
@@ -86,7 +93,8 @@ static void pgfdw_subxact_callback(SubXactEvent event,
8693
* mid-transaction anyway.
8794
*/
8895
PGconn *
89-
GetConnection(ForeignServer *server, UserMapping *user)
96+
GetConnection(ForeignServer *server, UserMapping *user,
97+
bool will_prep_stmt)
9098
{
9199
bool found;
92100
ConnCacheEntry *entry;
@@ -131,6 +139,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
131139
/* initialize new hashtable entry (key is already filled in) */
132140
entry->conn = NULL;
133141
entry->xact_depth = 0;
142+
entry->have_prep_stmt = false;
143+
entry->have_error = false;
134144
}
135145

136146
/*
@@ -147,6 +157,8 @@ GetConnection(ForeignServer *server, UserMapping *user)
147157
if (entry->conn == NULL)
148158
{
149159
entry->xact_depth = 0; /* just to be sure */
160+
entry->have_prep_stmt = false;
161+
entry->have_error = false;
150162
entry->conn = connect_pg_server(server, user);
151163
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
152164
entry->conn, server->servername);
@@ -157,6 +169,9 @@ GetConnection(ForeignServer *server, UserMapping *user)
157169
*/
158170
begin_remote_xact(entry);
159171

172+
/* Remember if caller will prepare statements */
173+
entry->have_prep_stmt |= will_prep_stmt;
174+
160175
return entry->conn;
161176
}
162177

@@ -393,13 +408,31 @@ GetCursorNumber(PGconn *conn)
393408
return ++cursor_number;
394409
}
395410

411+
/*
412+
* Assign a "unique" number for a prepared statement.
413+
*
414+
* This works much like GetCursorNumber, except that we never reset the counter
415+
* within a session. That's because we can't be 100% sure we've gotten rid
416+
* of all prepared statements on all connections, and it's not really worth
417+
* increasing the risk of prepared-statement name collisions by resetting.
418+
*/
419+
unsigned int
420+
GetPrepStmtNumber(PGconn *conn)
421+
{
422+
return ++prep_stmt_number;
423+
}
424+
396425
/*
397426
* Report an error we got from the remote server.
398427
*
399428
* elevel: error level to use (typically ERROR, but might be less)
400429
* res: PGresult containing the error
401430
* clear: if true, PQclear the result (otherwise caller will handle it)
402431
* sql: NULL, or text of remote command we tried to execute
432+
*
433+
* Note: callers that choose not to throw ERROR for a remote error are
434+
* responsible for making sure that the associated ConnCacheEntry gets
435+
* marked with have_error = true.
403436
*/
404437
void
405438
pgfdw_report_error(int elevel, PGresult *res, bool clear, const char *sql)
@@ -480,6 +513,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
480513
if (PQresultStatus(res) != PGRES_COMMAND_OK)
481514
pgfdw_report_error(ERROR, res, true, "COMMIT TRANSACTION");
482515
PQclear(res);
516+
517+
/*
518+
* If there were any errors in subtransactions, and we made
519+
* prepared statements, do a DEALLOCATE ALL to make sure we
520+
* get rid of all prepared statements. This is annoying and
521+
* not terribly bulletproof, but it's probably not worth
522+
* trying harder. We intentionally ignore any errors in the
523+
* DEALLOCATE.
524+
*/
525+
if (entry->have_prep_stmt && entry->have_error)
526+
{
527+
res = PQexec(entry->conn, "DEALLOCATE ALL");
528+
PQclear(res);
529+
}
530+
entry->have_prep_stmt = false;
531+
entry->have_error = false;
483532
break;
484533
case XACT_EVENT_PRE_PREPARE:
485534

@@ -502,14 +551,26 @@ pgfdw_xact_callback(XactEvent event, void *arg)
502551
elog(ERROR, "missed cleaning up connection during pre-commit");
503552
break;
504553
case XACT_EVENT_ABORT:
554+
/* Assume we might have lost track of prepared statements */
555+
entry->have_error = true;
505556
/* If we're aborting, abort all remote transactions too */
506557
res = PQexec(entry->conn, "ABORT TRANSACTION");
507558
/* Note: can't throw ERROR, it would be infinite loop */
508559
if (PQresultStatus(res) != PGRES_COMMAND_OK)
509560
pgfdw_report_error(WARNING, res, true,
510561
"ABORT TRANSACTION");
511562
else
563+
{
512564
PQclear(res);
565+
/* As above, make sure we've cleared any prepared stmts */
566+
if (entry->have_prep_stmt && entry->have_error)
567+
{
568+
res = PQexec(entry->conn, "DEALLOCATE ALL");
569+
PQclear(res);
570+
}
571+
entry->have_prep_stmt = false;
572+
entry->have_error = false;
573+
}
513574
break;
514575
}
515576

@@ -593,6 +654,8 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
593654
}
594655
else
595656
{
657+
/* Assume we might have lost track of prepared statements */
658+
entry->have_error = true;
596659
/* Rollback all remote subtransactions during abort */
597660
snprintf(sql, sizeof(sql),
598661
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",

0 commit comments

Comments
 (0)