Skip to content

[pull] master from postgres:master #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/src/sgml/ref/psql-ref.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,15 @@ SELECT $1 \parse stmt1
Outputs information about the current database connection,
including TLS-related information if TLS is in use.
</para>
<para>
Note that the <structfield>Client User</structfield> field shows
the user at the time of connection, while the
<structfield>Superuser</structfield> field indicates whether
the current user (in the current execution context) has
superuser privileges. These users are usually the same, but they can
differ, for example, if the current user was changed with the
<command>SET ROLE</command> command.
</para>
</listitem>
</varlistentry>

Expand Down
11 changes: 7 additions & 4 deletions src/backend/access/nbtree/nbtsearch.c
Original file line number Diff line number Diff line change
Expand Up @@ -2282,9 +2282,12 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir)
* previously-saved right link or left link. lastcurrblkno is the page that
* was current at the point where the blkno link was saved, which we use to
* reason about concurrent page splits/page deletions during backwards scans.
* In the common case where seized=false, blkno is either so->currPos.nextPage
* or so->currPos.prevPage, and lastcurrblkno is so->currPos.currPage.
*
* On entry, caller shouldn't hold any locks or pins on any page (we work
* directly off of blkno and lastcurrblkno instead). Parallel scan callers
* On entry, so->currPos shouldn't be locked by caller. so->currPos.buf must
* be InvalidBuffer/unpinned as needed by caller (note that lastcurrblkno
* won't need to be read again in almost all cases). Parallel scan callers
* that seized the scan before calling here should pass seized=true; such a
* caller's blkno and lastcurrblkno arguments come from the seized scan.
* seized=false callers just pass us the blkno/lastcurrblkno taken from their
Expand All @@ -2301,8 +2304,8 @@ _bt_readfirstpage(IndexScanDesc scan, OffsetNumber offnum, ScanDirection dir)
* success exit (except during so->dropPin index scans, when we drop the pin
* eagerly to avoid blocking VACUUM).
*
* If there are no more matching records in the given direction, we drop all
* locks and pins, invalidate so->currPos, and return false.
* If there are no more matching records in the given direction, we invalidate
* so->currPos (while ensuring it retains no locks or pins), and return false.
*
* We always release the scan for a parallel scan caller, regardless of
* success or failure; we'll call _bt_parallel_release as soon as possible.
Expand Down
4 changes: 4 additions & 0 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -7498,6 +7498,10 @@ CreateCheckPoint(int flags)
if (PriorRedoPtr != InvalidXLogRecPtr)
UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);

#ifdef USE_INJECTION_POINTS
INJECTION_POINT("checkpoint-before-old-wal-removal", NULL);
#endif

/*
* Delete old log files, those no longer needed for last checkpoint to
* prevent the disk holding the xlog from growing full.
Expand Down
18 changes: 18 additions & 0 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "postgres.h"

#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "fmgr.h"
#include "miscadmin.h"
Expand All @@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/inval.h"
#include "utils/memutils.h"

Expand Down Expand Up @@ -1825,9 +1827,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
bool updated_xmin = false;
bool updated_restart = false;
XLogRecPtr restart_lsn pg_attribute_unused();

SpinLockAcquire(&MyReplicationSlot->mutex);

/* remember the old restart lsn */
restart_lsn = MyReplicationSlot->data.restart_lsn;

/*
* Prevent moving the confirmed_flush backwards, as this could lead to
* data duplication issues caused by replicating already replicated
Expand Down Expand Up @@ -1881,6 +1887,18 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
/* first write new xmin to disk, so we know what's up after a crash */
if (updated_xmin || updated_restart)
{
#ifdef USE_INJECTION_POINTS
XLogSegNo seg1,
seg2;

XLByteToSeg(restart_lsn, seg1, wal_segment_size);
XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);

/* trigger injection point, but only if segment changes */
if (seg1 != seg2)
INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
#endif

ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
Expand Down
57 changes: 57 additions & 0 deletions src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
slot->last_saved_restart_lsn = InvalidXLogRecPtr;
slot->inactive_since = 0;

/*
Expand Down Expand Up @@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
XLogRecPtr last_saved_restart_lsn;
bool invalidated;
ReplicationSlotPersistency persistency;

if (!s->in_use)
continue;

SpinLockAcquire(&s->mutex);
persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);

/* invalidated slots need not apply */
if (invalidated)
continue;

/*
* For persistent slot use last_saved_restart_lsn to compute the
* oldest LSN for removal of WAL segments. The segments between
* last_saved_restart_lsn and restart_lsn might be needed by a
* persistent slot in the case of database crash. Non-persistent
* slots can't survive the database crash, so we don't care about
* last_saved_restart_lsn for them.
*/
if (persistency == RS_PERSISTENT)
{
if (last_saved_restart_lsn != InvalidXLogRecPtr &&
restart_lsn > last_saved_restart_lsn)
{
restart_lsn = last_saved_restart_lsn;
}
}

if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
restart_lsn < min_required))
Expand Down Expand Up @@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
XLogRecPtr last_saved_restart_lsn;
bool invalidated;
ReplicationSlotPersistency persistency;

s = &ReplicationSlotCtl->replication_slots[i];

Expand All @@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)

/* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex);
persistency = s->data.persistency;
restart_lsn = s->data.restart_lsn;
invalidated = s->data.invalidated != RS_INVAL_NONE;
last_saved_restart_lsn = s->last_saved_restart_lsn;
SpinLockRelease(&s->mutex);

/* invalidated slots need not apply */
if (invalidated)
continue;

/*
* For persistent slot use last_saved_restart_lsn to compute the
* oldest LSN for removal of WAL segments. The segments between
* last_saved_restart_lsn and restart_lsn might be needed by a
* persistent slot in the case of database crash. Non-persistent
* slots can't survive the database crash, so we don't care about
* last_saved_restart_lsn for them.
*/
if (persistency == RS_PERSISTENT)
{
if (last_saved_restart_lsn != InvalidXLogRecPtr &&
restart_lsn > last_saved_restart_lsn)
{
restart_lsn = last_saved_restart_lsn;
}
}

if (restart_lsn == InvalidXLogRecPtr)
continue;

Expand Down Expand Up @@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void)

Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);

/*
* The replication slot mechanism is used to prevent removal of required
Expand Down Expand Up @@ -1766,6 +1810,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
*/
SpinLockAcquire(&s->mutex);

Assert(s->data.restart_lsn >= s->last_saved_restart_lsn);

restart_lsn = s->data.restart_lsn;

/* we do nothing if the slot is already invalid */
Expand Down Expand Up @@ -1835,7 +1881,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* just rely on .invalidated.
*/
if (invalidation_cause == RS_INVAL_WAL_REMOVED)
{
s->data.restart_lsn = InvalidXLogRecPtr;
s->last_saved_restart_lsn = InvalidXLogRecPtr;
}

/* Let caller know */
*invalidated = true;
Expand Down Expand Up @@ -2079,6 +2128,12 @@ CheckPointReplicationSlots(bool is_shutdown)
SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);

/*
* Recompute the required LSN as SaveSlotToPath() updated
* last_saved_restart_lsn for slots.
*/
ReplicationSlotsComputeRequiredLSN();
}

/*
Expand Down Expand Up @@ -2354,6 +2409,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
if (!slot->just_dirtied)
slot->dirty = false;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
SpinLockRelease(&slot->mutex);

LWLockRelease(&slot->io_in_progress_lock);
Expand Down Expand Up @@ -2569,6 +2625,7 @@ RestoreSlotFromDisk(const char *name)
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;

slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
Expand Down
5 changes: 4 additions & 1 deletion src/bin/psql/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
int ssl_in_use,
password_used,
gssapi_used;
int version_num;
char *paramval;

if (!active_branch)
Expand All @@ -793,7 +794,9 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
/* Get values for the parameters */
host = PQhost(pset.db);
hostaddr = PQhostaddr(pset.db);
protocol_version = psprintf("%d", PQprotocolVersion(pset.db));
version_num = PQfullProtocolVersion(pset.db);
protocol_version = psprintf("%d.%d", version_num / 10000,
version_num % 10000);
ssl_in_use = PQsslInUse(pset.db);
password_used = PQconnectionUsedPassword(pset.db);
gssapi_used = PQconnectionUsedGSSAPI(pset.db);
Expand Down
8 changes: 8 additions & 0 deletions src/include/replication/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ typedef struct ReplicationSlot
* recently stopped.
*/
TimestampTz inactive_since;

/*
* Latest restart_lsn that has been flushed to disk. For persistent slots
* the flushed LSN should be taken into account when calculating the
* oldest LSN for WAL segments removal.
*/
XLogRecPtr last_saved_restart_lsn;

} ReplicationSlot;

#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
Expand Down
2 changes: 2 additions & 0 deletions src/test/recovery/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ tests += {
't/043_no_contrecord_switch.pl',
't/044_invalidate_inactive_slots.pl',
't/045_archive_restartpoint.pl',
't/046_checkpoint_logical_slot.pl',
't/047_checkpoint_physical_slot.pl'
],
},
}
Loading