Skip to content

Commit 3709ca1

Browse files
committed
pg_basebackup: Add option to create replication slot
When requesting a particular replication slot, the new pg_basebackup option -C/--create-slot creates it before starting to replicate from it. Further refactor the slot creation logic to include the temporary slot creation logic into the same function. Add new arguments is_temporary and preserve_wal to CreateReplicationSlot(). Print in --verbose mode that a slot has been created. Author: Michael Banck <[email protected]>
1 parent 59597e6 commit 3709ca1

File tree

9 files changed

+112
-41
lines changed

9 files changed

+112
-41
lines changed

doc/src/sgml/ref/pg_basebackup.sgml

+16
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,18 @@ PostgreSQL documentation
382382
</listitem>
383383
</varlistentry>
384384

385+
<varlistentry>
386+
<term><option>-C</option></term>
387+
<term><option>--create-slot</option></term>
388+
<listitem>
389+
<para>
390+
This option causes the replication slot specified by the
391+
option <literal>--slot</literal> to be created before starting the
392+
backup. In this case, an error is raised if the slot already exists.
393+
</para>
394+
</listitem>
395+
</varlistentry>
396+
385397
<varlistentry>
386398
<term><option>-l <replaceable class="parameter">label</replaceable></option></term>
387399
<term><option>--label=<replaceable class="parameter">label</replaceable></option></term>
@@ -462,6 +474,10 @@ PostgreSQL documentation
462474
the server does not remove any necessary WAL data in the time between
463475
the end of the base backup and the start of streaming replication.
464476
</para>
477+
<para>
478+
The specified replication slot has to exist unless the
479+
option <option>-C</option> is also used.
480+
</para>
465481
<para>
466482
If this option is not specified and the server supports temporary
467483
replication slots (version 10 and later), then a temporary replication

src/bin/pg_basebackup/pg_basebackup.c

+54-9
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ static pg_time_t last_progress_report = 0;
9393
static int32 maxrate = 0; /* no limit by default */
9494
static char *replication_slot = NULL;
9595
static bool temp_replication_slot = true;
96+
static bool create_slot = false;
97+
static bool no_slot = false;
9698

9799
static bool success = false;
98100
static bool made_new_pgdata = false;
@@ -346,6 +348,7 @@ usage(void)
346348
printf(_("\nGeneral options:\n"));
347349
printf(_(" -c, --checkpoint=fast|spread\n"
348350
" set fast or spread checkpointing\n"));
351+
printf(_(" -C, --create-slot create replication slot\n"));
349352
printf(_(" -l, --label=LABEL set backup label\n"));
350353
printf(_(" -n, --no-clean do not clean up after errors\n"));
351354
printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n"));
@@ -466,7 +469,6 @@ typedef struct
466469
char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */
467470
char *sysidentifier;
468471
int timeline;
469-
bool temp_slot;
470472
} logstreamer_param;
471473

472474
static int
@@ -492,9 +494,6 @@ LogStreamerMain(logstreamer_param *param)
492494
stream.mark_done = true;
493495
stream.partial_suffix = NULL;
494496
stream.replication_slot = replication_slot;
495-
stream.temp_slot = param->temp_slot;
496-
if (stream.temp_slot && !stream.replication_slot)
497-
stream.replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
498497

499498
if (format == 'p')
500499
stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
@@ -583,9 +582,29 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
583582

584583
/* Temporary replication slots are only supported in 10 and newer */
585584
if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS)
586-
param->temp_slot = false;
587-
else
588-
param->temp_slot = temp_replication_slot;
585+
temp_replication_slot = false;
586+
587+
/*
588+
* Create replication slot if requested
589+
*/
590+
if (temp_replication_slot && !replication_slot)
591+
replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn));
592+
if (temp_replication_slot || create_slot)
593+
{
594+
if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
595+
temp_replication_slot, true, true, false))
596+
disconnect_and_exit(1);
597+
598+
if (verbose)
599+
{
600+
if (temp_replication_slot)
601+
fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"),
602+
progname, replication_slot);
603+
else
604+
fprintf(stderr, _("%s: created replication slot \"%s\"\n"),
605+
progname, replication_slot);
606+
}
607+
}
589608

590609
if (format == 'p')
591610
{
@@ -2079,6 +2098,7 @@ main(int argc, char **argv)
20792098
{"pgdata", required_argument, NULL, 'D'},
20802099
{"format", required_argument, NULL, 'F'},
20812100
{"checkpoint", required_argument, NULL, 'c'},
2101+
{"create-slot", no_argument, NULL, 'C'},
20822102
{"max-rate", required_argument, NULL, 'r'},
20832103
{"write-recovery-conf", no_argument, NULL, 'R'},
20842104
{"slot", required_argument, NULL, 'S'},
@@ -2105,7 +2125,6 @@ main(int argc, char **argv)
21052125
int c;
21062126

21072127
int option_index;
2108-
bool no_slot = false;
21092128

21102129
progname = get_progname(argv[0]);
21112130
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
@@ -2127,11 +2146,14 @@ main(int argc, char **argv)
21272146

21282147
atexit(cleanup_directories_atexit);
21292148

2130-
while ((c = getopt_long(argc, argv, "D:F:r:RT:X:l:nNzZ:d:c:h:p:U:s:S:wWvP",
2149+
while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWvP",
21312150
long_options, &option_index)) != -1)
21322151
{
21332152
switch (c)
21342153
{
2154+
case 'C':
2155+
create_slot = true;
2156+
break;
21352157
case 'D':
21362158
basedir = pg_strdup(optarg);
21372159
break;
@@ -2348,6 +2370,29 @@ main(int argc, char **argv)
23482370
temp_replication_slot = false;
23492371
}
23502372

2373+
if (create_slot)
2374+
{
2375+
if (!replication_slot)
2376+
{
2377+
fprintf(stderr,
2378+
_("%s: --create-slot needs a slot to be specified using --slot\n"),
2379+
progname);
2380+
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
2381+
progname);
2382+
exit(1);
2383+
}
2384+
2385+
if (no_slot)
2386+
{
2387+
fprintf(stderr,
2388+
_("%s: --create-slot and --no-slot are incompatible options\n"),
2389+
progname);
2390+
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
2391+
progname);
2392+
exit(1);
2393+
}
2394+
}
2395+
23512396
if (xlog_dir)
23522397
{
23532398
if (format != 'p')

src/bin/pg_basebackup/pg_receivewal.c

+1-2
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,6 @@ StreamLog(void)
431431
stream.do_sync);
432432
stream.partial_suffix = ".partial";
433433
stream.replication_slot = replication_slot;
434-
stream.temp_slot = false;
435434

436435
ReceiveXlogStream(conn, &stream);
437436

@@ -728,7 +727,7 @@ main(int argc, char **argv)
728727
_("%s: creating replication slot \"%s\"\n"),
729728
progname, replication_slot);
730729

731-
if (!CreateReplicationSlot(conn, replication_slot, NULL, true,
730+
if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
732731
slot_exists_ok))
733732
disconnect_and_exit(1);
734733
disconnect_and_exit(0);

src/bin/pg_basebackup/pg_recvlogical.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -979,8 +979,8 @@ main(int argc, char **argv)
979979
_("%s: creating replication slot \"%s\"\n"),
980980
progname, replication_slot);
981981

982-
if (!CreateReplicationSlot(conn, replication_slot, plugin,
983-
false, slot_exists_ok))
982+
if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
983+
false, false, slot_exists_ok))
984984
disconnect_and_exit(1);
985985
startpos = InvalidXLogRecPtr;
986986
}

src/bin/pg_basebackup/receivelog.c

-18
Original file line numberDiff line numberDiff line change
@@ -522,24 +522,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
522522
PQclear(res);
523523
}
524524

525-
/*
526-
* Create temporary replication slot if one is needed
527-
*/
528-
if (stream->temp_slot)
529-
{
530-
snprintf(query, sizeof(query),
531-
"CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL",
532-
stream->replication_slot);
533-
res = PQexec(conn, query);
534-
if (PQresultStatus(res) != PGRES_TUPLES_OK)
535-
{
536-
fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"),
537-
progname, stream->replication_slot, PQerrorMessage(conn));
538-
PQclear(res);
539-
return false;
540-
}
541-
}
542-
543525
/*
544526
* initialize flush position to starting point, it's the caller's
545527
* responsibility that that's sane.

src/bin/pg_basebackup/receivelog.h

-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ typedef struct StreamCtl
4747
WalWriteMethod *walmethod; /* How to write the WAL */
4848
char *partial_suffix; /* Suffix appended to partially received files */
4949
char *replication_slot; /* Replication slot to use, or NULL */
50-
bool temp_slot; /* Create temporary replication slot */
5150
} StreamCtl;
5251

5352

src/bin/pg_basebackup/streamutil.c

+11-5
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,8 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
398398
*/
399399
bool
400400
CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
401-
bool is_physical, bool slot_exists_ok)
401+
bool is_temporary, bool is_physical, bool reserve_wal,
402+
bool slot_exists_ok)
402403
{
403404
PQExpBuffer query;
404405
PGresult *res;
@@ -410,13 +411,18 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
410411
Assert(slot_name != NULL);
411412

412413
/* Build query */
414+
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
415+
if (is_temporary)
416+
appendPQExpBuffer(query, " TEMPORARY");
413417
if (is_physical)
414-
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
415-
slot_name);
418+
{
419+
appendPQExpBuffer(query, " PHYSICAL");
420+
if (reserve_wal)
421+
appendPQExpBuffer(query, " RESERVE_WAL");
422+
}
416423
else
417424
{
418-
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
419-
slot_name, plugin);
425+
appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
420426
if (PQserverVersion(conn) >= 100000)
421427
/* pg_recvlogical doesn't use an exported snapshot, so suppress */
422428
appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");

src/bin/pg_basebackup/streamutil.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ extern PGconn *GetConnection(void);
3333

3434
/* Replication commands */
3535
extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
36-
const char *plugin, bool is_physical,
37-
bool slot_exists_ok);
36+
const char *plugin, bool is_temporary,
37+
bool is_physical, bool reserve_wal,
38+
bool slot_exists_ok);
3839
extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
3940
extern bool RunIdentifySystem(PGconn *conn, char **sysid,
4041
TimeLineID *starttli,

src/bin/pg_basebackup/t/010_pg_basebackup.pl

+25-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use Config;
55
use PostgresNode;
66
use TestLib;
7-
use Test::More tests => 72;
7+
use Test::More tests => 78;
88

99
program_help_ok('pg_basebackup');
1010
program_version_ok('pg_basebackup');
@@ -259,9 +259,32 @@
259259
[ 'pg_basebackup', '-D',
260260
"$tempdir/backupxs_sl_fail", '-X',
261261
'stream', '-S',
262-
'slot1' ],
262+
'slot0' ],
263263
'pg_basebackup fails with nonexistent replication slot');
264264

265+
$node->command_fails(
266+
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ],
267+
'pg_basebackup -C fails without slot name');
268+
269+
$node->command_fails(
270+
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '--no-slot' ],
271+
'pg_basebackup fails with -C -S --no-slot');
272+
273+
$node->command_ok(
274+
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0' ],
275+
'pg_basebackup -C runs');
276+
277+
is($node->safe_psql('postgres', q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}),
278+
'slot0',
279+
'replication slot was created');
280+
isnt($node->safe_psql('postgres', q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}),
281+
'',
282+
'restart LSN of new slot is not null');
283+
284+
$node->command_fails(
285+
[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0' ],
286+
'pg_basebackup fails with -C -S and a previously existing slot');
287+
265288
$node->safe_psql('postgres',
266289
q{SELECT * FROM pg_create_physical_replication_slot('slot1')});
267290
my $lsn = $node->safe_psql('postgres',

0 commit comments

Comments
 (0)