Skip to content

Use stack for buffer/WAL usage instrumentation #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 12 additions & 40 deletions contrib/pg_stat_statements/pg_stat_statements.c
Original file line number Diff line number Diff line change
Expand Up @@ -901,21 +901,11 @@ pgss_planner(Query *parse,
&& pgss_track_planning && query_string
&& parse->queryId != UINT64CONST(0))
{
InstrumentUsage *usage;
instr_time start;
instr_time duration;
BufferUsage bufusage_start,
bufusage;
WalUsage walusage_start,
walusage;

/* We need to track buffer usage as the planner can access them. */
bufusage_start = pgBufferUsage;

/*
* Similarly the planner could write some WAL records in some cases
* (e.g. setting a hint bit with those being WAL-logged)
*/
walusage_start = pgWalUsage;
InstrUsageStart();
INSTR_TIME_SET_CURRENT(start);

nesting_level++;
Expand All @@ -936,14 +926,7 @@ pgss_planner(Query *parse,

INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);

/* calc differences of buffer counters. */
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);

/* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);
usage = InstrUsageStop();

pgss_store(query_string,
parse->queryId,
Expand All @@ -952,8 +935,8 @@ pgss_planner(Query *parse,
PGSS_PLAN,
INSTR_TIME_GET_MILLISEC(duration),
0,
&bufusage,
&walusage,
&usage->bufusage,
&usage->walusage,
NULL,
NULL,
0,
Expand Down Expand Up @@ -1094,8 +1077,8 @@ pgss_ExecutorEnd(QueryDesc *queryDesc)
PGSS_EXEC,
queryDesc->totaltime->total * 1000.0, /* convert to msec */
queryDesc->estate->es_total_processed,
&queryDesc->totaltime->bufusage,
&queryDesc->totaltime->walusage,
&queryDesc->totaltime->instrusage.bufusage,
&queryDesc->totaltime->instrusage.walusage,
queryDesc->estate->es_jit ? &queryDesc->estate->es_jit->instr : NULL,
NULL,
queryDesc->estate->es_parallel_workers_to_launch,
Expand Down Expand Up @@ -1158,16 +1141,12 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
!IsA(parsetree, ExecuteStmt) &&
!IsA(parsetree, PrepareStmt))
{
InstrumentUsage *usage;
instr_time start;
instr_time duration;
uint64 rows;
BufferUsage bufusage_start,
bufusage;
WalUsage walusage_start,
walusage;

bufusage_start = pgBufferUsage;
walusage_start = pgWalUsage;
InstrUsageStart();
INSTR_TIME_SET_CURRENT(start);

nesting_level++;
Expand Down Expand Up @@ -1200,6 +1179,7 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,

INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);
usage = InstrUsageStop();

/*
* Track the total number of rows retrieved or affected by the utility
Expand All @@ -1212,23 +1192,15 @@ pgss_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
qc->commandTag == CMDTAG_REFRESH_MATERIALIZED_VIEW)) ?
qc->nprocessed : 0;

/* calc differences of buffer counters. */
memset(&bufusage, 0, sizeof(BufferUsage));
BufferUsageAccumDiff(&bufusage, &pgBufferUsage, &bufusage_start);

/* calc differences of WAL counters. */
memset(&walusage, 0, sizeof(WalUsage));
WalUsageAccumDiff(&walusage, &pgWalUsage, &walusage_start);

pgss_store(queryString,
saved_queryId,
saved_stmt_location,
saved_stmt_len,
PGSS_EXEC,
INSTR_TIME_GET_MILLISEC(duration),
rows,
&bufusage,
&walusage,
&usage->bufusage,
&usage->walusage,
NULL,
NULL,
0,
Expand Down
73 changes: 35 additions & 38 deletions src/backend/access/brin/brin.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
#define PARALLEL_KEY_BRIN_SHARED UINT64CONST(0xB000000000000001)
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
#define PARALLEL_KEY_INSTR_USAGE UINT64CONST(0xB000000000000004)

/*
* Status for index builds performed in parallel. This is allocated in a
Expand Down Expand Up @@ -144,8 +143,7 @@ typedef struct BrinLeader
BrinShared *brinshared;
Sharedsort *sharedsort;
Snapshot snapshot;
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *instrusage;
} BrinLeader;

/*
Expand Down Expand Up @@ -2371,8 +2369,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
BrinShared *brinshared;
Sharedsort *sharedsort;
BrinLeader *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *instrusage = NULL;
bool leaderparticipates = true;
int querylen;

Expand Down Expand Up @@ -2414,19 +2411,14 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
shm_toc_estimate_keys(&pcxt->estimator, 2);

/*
* Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
* and PARALLEL_KEY_BUFFER_USAGE.
*
* If there are no extensions loaded that care, we could skip this. We
* have no way of knowing whether anyone's looking at pgWalUsage or
* pgBufferUsage, so do it unconditionally.
* Estimate space for InstrUsage -- PARALLEL_KEY_INSTR_USAGE, if needed.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
if (InstrumentUsageActive())
{
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(InstrumentUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
}

/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
Expand Down Expand Up @@ -2501,12 +2493,12 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
* Allocate space for each worker's WalUsage and BufferUsage; no need to
* initialize.
*/
walusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
bufferusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
if (InstrumentUsageActive())
{
instrusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(InstrumentUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTR_USAGE, instrusage);
}

/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
Expand All @@ -2517,8 +2509,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
brinleader->brinshared = brinshared;
brinleader->sharedsort = sharedsort;
brinleader->snapshot = snapshot;
brinleader->walusage = walusage;
brinleader->bufferusage = bufferusage;
brinleader->instrusage = instrusage;

/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
Expand Down Expand Up @@ -2553,11 +2544,14 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
WaitForParallelWorkersToFinish(brinleader->pcxt);

/*
* Next, accumulate WAL usage. (This must wait for the workers to finish,
* or we might get incomplete data.)
* Next, accumulate usage data. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
if (InstrumentUsageActive())
{
for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
InstrUsageAddToCurrent(&brinleader->instrusage[i]);
}

/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(brinleader->snapshot))
Expand Down Expand Up @@ -2870,8 +2864,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
Relation indexRel;
LOCKMODE heapLockmode;
LOCKMODE indexLockmode;
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *shm_usage;
int sortmem;

/*
Expand Down Expand Up @@ -2918,8 +2911,10 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
tuplesort_attach_shared(sharedsort, seg);

/* Prepare to track buffer usage during parallel execution */
InstrStartParallelQuery();
/* Prepare to track buffer usage during parallel execution, if needed */
shm_usage = shm_toc_lookup(toc, PARALLEL_KEY_INSTR_USAGE, true);
if (shm_usage)
InstrUsageStart();

/*
* Might as well use reliable figure when doling out maintenance_work_mem
Expand All @@ -2932,10 +2927,12 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
heapRel, indexRel, sortmem, false);

/* Report WAL/buffer usage during parallel execution */
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
&walusage[ParallelWorkerNumber]);
if (shm_usage)
{
InstrumentUsage *usage = InstrUsageStop();

memcpy(&shm_usage[ParallelWorkerNumber], usage, sizeof(InstrumentUsage));
}

index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
Expand Down
67 changes: 32 additions & 35 deletions src/backend/access/gin/gininsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
#define PARALLEL_KEY_GIN_SHARED UINT64CONST(0xB000000000000001)
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xB000000000000002)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xB000000000000003)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xB000000000000004)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xB000000000000005)
#define PARALLEL_KEY_INSTR_USAGE UINT64CONST(0xB000000000000004)

/*
* Status for index builds performed in parallel. This is allocated in a
Expand Down Expand Up @@ -132,8 +131,7 @@ typedef struct GinLeader
GinBuildShared *ginshared;
Sharedsort *sharedsort;
Snapshot snapshot;
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *instrusage;
} GinLeader;

typedef struct
Expand Down Expand Up @@ -904,8 +902,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
GinBuildShared *ginshared;
Sharedsort *sharedsort;
GinLeader *ginleader = (GinLeader *) palloc0(sizeof(GinLeader));
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *instrusage = NULL;
bool leaderparticipates = true;
int querylen;

Expand Down Expand Up @@ -946,19 +943,14 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
shm_toc_estimate_keys(&pcxt->estimator, 2);

/*
* Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
* and PARALLEL_KEY_BUFFER_USAGE.
*
* If there are no extensions loaded that care, we could skip this. We
* have no way of knowing whether anyone's looking at pgWalUsage or
* pgBufferUsage, so do it unconditionally.
* Estimate space for InstrUsage -- PARALLEL_KEY_INSTR_USAGE, if needed.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
if (InstrumentUsageActive())
{
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(InstrumentUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
}

/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
if (debug_query_string)
Expand Down Expand Up @@ -1028,12 +1020,12 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
* Allocate space for each worker's WalUsage and BufferUsage; no need to
* initialize.
*/
walusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
bufferusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
if (InstrumentUsageActive())
{
instrusage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(InstrumentUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTR_USAGE, instrusage);
}

/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
Expand All @@ -1044,8 +1036,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
ginleader->ginshared = ginshared;
ginleader->sharedsort = sharedsort;
ginleader->snapshot = snapshot;
ginleader->walusage = walusage;
ginleader->bufferusage = bufferusage;
ginleader->instrusage = instrusage;

/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
Expand Down Expand Up @@ -1083,8 +1074,11 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
* Next, accumulate WAL usage. (This must wait for the workers to finish,
* or we might get incomplete data.)
*/
for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]);
if (InstrumentUsageActive())
{
for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
InstrUsageAddToCurrent(&ginleader->instrusage[i]);
}

/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(ginleader->snapshot))
Expand Down Expand Up @@ -2079,8 +2073,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
Relation indexRel;
LOCKMODE heapLockmode;
LOCKMODE indexLockmode;
WalUsage *walusage;
BufferUsage *bufferusage;
InstrumentUsage *shm_usage;
int sortmem;

/*
Expand Down Expand Up @@ -2147,7 +2140,9 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
tuplesort_attach_shared(sharedsort, seg);

/* Prepare to track buffer usage during parallel execution */
InstrStartParallelQuery();
shm_usage = shm_toc_lookup(toc, PARALLEL_KEY_INSTR_USAGE, true);
if (shm_usage)
InstrUsageStart();

/*
* Might as well use reliable figure when doling out maintenance_work_mem
Expand All @@ -2160,10 +2155,12 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
heapRel, indexRel, sortmem, false);

/* Report WAL/buffer usage during parallel execution */
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
&walusage[ParallelWorkerNumber]);
if (shm_usage)
{
InstrumentUsage *usage = InstrUsageStop();

memcpy(&shm_usage[ParallelWorkerNumber], usage, sizeof(InstrumentUsage));
}

index_close(indexRel, indexLockmode);
table_close(heapRel, heapLockmode);
Expand Down
Loading