Skip to content

Commit 24da029

Browse files
danolivoAlena Rybakina
authored and
Alena Rybakina
committed
Be more careful with locks of relations and syscaches in get_list_of_relids() routine
Switch on feature 'search on neighbour feature spaces' by a GUC (disabled by default). Some mistakes fixed.
1 parent 428a899 commit 24da029

8 files changed

+102
-30
lines changed

aqo.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,19 @@ _PG_init(void)
228228
NULL
229229
);
230230

231+
DefineCustomBoolVariable(
232+
"aqo.wide_search",
233+
"Search ML data in neighbour feature spaces.",
234+
NULL,
235+
&use_wide_search,
236+
false,
237+
PGC_USERSET,
238+
0,
239+
NULL,
240+
NULL,
241+
NULL
242+
);
243+
231244
DefineCustomIntVariable("aqo.join_threshold",
232245
"Sets the threshold of number of JOINs in query beyond which AQO is used.",
233246
NULL,

aqo.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ extern bool force_collect_stat;
173173
extern bool aqo_show_hash;
174174
extern bool aqo_show_details;
175175
extern int aqo_join_threshold;
176+
extern bool use_wide_search;
176177

177178
/* Parameters for current query */
178179
typedef struct QueryContextData

aqo_shared.c

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,16 +191,18 @@ aqo_init_shmem(void)
191191
{
192192
/* First time through ... */
193193

194-
LWLockInitialize(&aqo_state->lock, LWLockNewTrancheId());
195194
aqo_state->dsm_handler = DSM_HANDLE_INVALID;
196-
197195
aqo_state->qtexts_dsa_handler = DSM_HANDLE_INVALID;
196+
aqo_state->data_dsa_handler = DSM_HANDLE_INVALID;
197+
198198
aqo_state->qtext_trancheid = LWLockNewTrancheId();
199+
199200
aqo_state->qtexts_changed = false;
200-
aqo_state->data_dsa_handler = DSM_HANDLE_INVALID;
201+
aqo_state->stat_changed = false;
201202
aqo_state->data_changed = false;
202203
aqo_state->queries_changed = false;
203204

205+
LWLockInitialize(&aqo_state->lock, LWLockNewTrancheId());
204206
LWLockInitialize(&aqo_state->stat_lock, LWLockNewTrancheId());
205207
LWLockInitialize(&aqo_state->qtexts_lock, LWLockNewTrancheId());
206208
LWLockInitialize(&aqo_state->data_lock, LWLockNewTrancheId());
@@ -245,7 +247,7 @@ aqo_init_shmem(void)
245247
LWLockRegisterTranche(aqo_state->data_lock.tranche, "AQO Data Lock Tranche");
246248
LWLockRegisterTranche(aqo_state->queries_lock.tranche, "AQO Queries Lock Tranche");
247249

248-
if (!IsUnderPostmaster)
250+
if (!IsUnderPostmaster && !found)
249251
{
250252
before_shmem_exit(on_shmem_shutdown, (Datum) 0);
251253

@@ -261,8 +263,16 @@ aqo_init_shmem(void)
261263
static void
262264
on_shmem_shutdown(int code, Datum arg)
263265
{
266+
Assert(!IsUnderPostmaster);
267+
268+
/*
269+
* Save ML data to a permanent storage. Do it on postmaster shutdown only
270+
* to save time. We can't do so for query_texts and aqo_data because of DSM
271+
* limits.
272+
*/
264273
aqo_stat_flush();
265274
aqo_queries_flush();
275+
return;
266276
}
267277

268278
Size

cardinality_estimation.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
#include "machine_learning.h"
2525
#include "storage.h"
2626

27+
28+
bool use_wide_search = false;
29+
2730
#ifdef AQO_DEBUG_PRINT
2831
static void
2932
predict_debug_output(List *clauses, List *selectivities,
@@ -90,7 +93,7 @@ predict_for_relation(List *clauses, List *selectivities, List *relsigns,
9093
*/
9194

9295
/* Try to search in surrounding feature spaces for the same node */
93-
if (!load_aqo_data(query_context.fspace_hash, *fss, data, NULL, true))
96+
if (!load_aqo_data(query_context.fspace_hash, *fss, data, NULL, use_wide_search))
9497
result = -1;
9598
else
9699
{

conf.add

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
autovacuum = off
22
shared_preload_libraries = 'postgres_fdw, aqo'
33
max_parallel_workers_per_gather = 1 # switch off parallel workers because of unsteadiness
4+
aqo.wide_search = 'on'

path_utils.c

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ hashTempTupleDesc(TupleDesc desc)
151151
return s;
152152
}
153153

154+
#include "storage/lmgr.h"
155+
154156
/*
155157
* Get list of relation indexes and prepare list of permanent table reloids,
156158
* list of temporary table reloids (can be changed between query launches) and
@@ -173,6 +175,8 @@ get_list_of_relids(PlannerInfo *root, Relids relids, RelSortOut *rels)
173175
HeapTuple htup;
174176
Form_pg_class classForm;
175177
char *relname = NULL;
178+
Oid relrewrite;
179+
char relpersistence;
176180

177181
entry = planner_rt_fetch(index, root);
178182

@@ -187,34 +191,39 @@ get_list_of_relids(PlannerInfo *root, Relids relids, RelSortOut *rels)
187191
if (!HeapTupleIsValid(htup))
188192
elog(PANIC, "cache lookup failed for reloid %u", entry->relid);
189193

194+
/* Copy the fields from syscache and release the slot as quickly as possible. */
190195
classForm = (Form_pg_class) GETSTRUCT(htup);
196+
relpersistence = classForm->relpersistence;
197+
relrewrite = classForm->relrewrite;
198+
relname = pstrdup(NameStr(classForm->relname));
199+
ReleaseSysCache(htup);
191200

192-
if (classForm->relpersistence == RELPERSISTENCE_TEMP)
201+
if (relpersistence == RELPERSISTENCE_TEMP)
193202
{
194203
/* The case of temporary table */
195204

196-
Relation trel = relation_open(entry->relid, NoLock);
197-
TupleDesc tdesc = RelationGetDescr(trel);
205+
Relation trel;
206+
TupleDesc tdesc;
198207

208+
trel = relation_open(entry->relid, NoLock);
209+
tdesc = RelationGetDescr(trel);
210+
Assert(CheckRelationLockedByMe(trel, AccessShareLock, true));
199211
hashes = lappend_uint64(hashes, hashTempTupleDesc(tdesc));
200212
relation_close(trel, NoLock);
201213
}
202214
else
203215
{
204216
/* The case of regular table */
205217
relname = quote_qualified_identifier(
206-
get_namespace_name(get_rel_namespace(entry->relid)),
207-
classForm->relrewrite ?
208-
get_rel_name(classForm->relrewrite) :
209-
NameStr(classForm->relname));
218+
get_namespace_name(get_rel_namespace(entry->relid)),
219+
relrewrite ? get_rel_name(relrewrite) : relname);
220+
210221
hashes = lappend_uint64(hashes, DatumGetInt64(hash_any_extended(
211222
(unsigned char *) relname,
212223
strlen(relname), 0)));
213224

214225
hrels = lappend_oid(hrels, entry->relid);
215226
}
216-
217-
ReleaseSysCache(htup);
218227
}
219228

220229
rels->hrels = list_concat(rels->hrels, hrels);

postprocessing.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ restore_selectivities(List *clauselist, List *relidslist, JoinType join_type,
173173
{
174174
List *lst = NIL;
175175
ListCell *l;
176-
int i = 0;
177176
bool parametrized_sel;
178177
int nargs;
179178
int *args_hash;
@@ -222,7 +221,6 @@ restore_selectivities(List *clauselist, List *relidslist, JoinType join_type,
222221
Assert(cur_sel > 0);
223222

224223
lst = lappend(lst, cur_sel);
225-
i++;
226224
}
227225

228226
if (parametrized_sel)

storage.c

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,9 @@ aqo_stat_store(uint64 queryid, bool use_aqo,
303303
entry->exec_time[pos] = exec_time;
304304
entry->est_error[pos] = est_error;
305305
}
306+
306307
entry = memcpy(palloc(sizeof(StatEntry)), entry, sizeof(StatEntry));
308+
aqo_state->stat_changed = true;
307309
LWLockRelease(&aqo_state->stat_lock);
308310
return entry;
309311
}
@@ -425,14 +427,24 @@ aqo_stat_flush(void)
425427
int ret;
426428
long entries;
427429

428-
LWLockAcquire(&aqo_state->stat_lock, LW_SHARED);
430+
/* Use exclusive lock to prevent concurrent flushing in different backends. */
431+
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
432+
433+
if (!aqo_state->stat_changed)
434+
/* Hash table wasn't changed, meaningless to store it in permanent storage */
435+
goto end;
436+
429437
entries = hash_get_num_entries(stat_htab);
430438
hash_seq_init(&hash_seq, stat_htab);
431439
ret = data_store(PGAQO_STAT_FILE, _form_stat_record_cb, entries,
432440
(void *) &hash_seq);
433441
if (ret != 0)
434442
hash_seq_term(&hash_seq);
443+
else
444+
/* Hash table and disk storage are now consistent */
445+
aqo_state->stat_changed = false;
435446

447+
end:
436448
LWLockRelease(&aqo_state->stat_lock);
437449
}
438450

@@ -469,7 +481,7 @@ aqo_qtexts_flush(void)
469481
long entries;
470482

471483
dsa_init();
472-
LWLockAcquire(&aqo_state->qtexts_lock, LW_SHARED);
484+
LWLockAcquire(&aqo_state->qtexts_lock, LW_EXCLUSIVE);
473485

474486
if (!aqo_state->qtexts_changed)
475487
/* XXX: mull over forced mode. */
@@ -481,7 +493,9 @@ aqo_qtexts_flush(void)
481493
(void *) &hash_seq);
482494
if (ret != 0)
483495
hash_seq_term(&hash_seq);
484-
aqo_state->qtexts_changed = false;
496+
else
497+
/* Hash table and disk storage are now consistent */
498+
aqo_state->qtexts_changed = false;
485499

486500
end:
487501
LWLockRelease(&aqo_state->qtexts_lock);
@@ -531,7 +545,7 @@ aqo_data_flush(void)
531545
long entries;
532546

533547
dsa_init();
534-
LWLockAcquire(&aqo_state->data_lock, LW_SHARED);
548+
LWLockAcquire(&aqo_state->data_lock, LW_EXCLUSIVE);
535549

536550
if (!aqo_state->data_changed)
537551
/* XXX: mull over forced mode. */
@@ -548,6 +562,7 @@ aqo_data_flush(void)
548562
*/
549563
hash_seq_term(&hash_seq);
550564
else
565+
/* Hash table and disk storage are now consistent */
551566
aqo_state->data_changed = false;
552567
end:
553568
LWLockRelease(&aqo_state->data_lock);
@@ -574,14 +589,22 @@ aqo_queries_flush(void)
574589
int ret;
575590
long entries;
576591

577-
LWLockAcquire(&aqo_state->queries_lock, LW_SHARED);
592+
LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE);
593+
594+
if (!aqo_state->queries_changed)
595+
goto end;
596+
578597
entries = hash_get_num_entries(queries_htab);
579598
hash_seq_init(&hash_seq, queries_htab);
580599
ret = data_store(PGAQO_QUERIES_FILE, _form_queries_record_cb, entries,
581600
(void *) &hash_seq);
582601
if (ret != 0)
583602
hash_seq_term(&hash_seq);
603+
else
604+
/* Hash table and disk storage are now consistent */
605+
aqo_state->queries_changed = false;
584606

607+
end:
585608
LWLockRelease(&aqo_state->queries_lock);
586609
}
587610

@@ -621,7 +644,8 @@ data_store(const char *filename, form_record_t callback,
621644
goto error;
622645
}
623646

624-
(void) durable_rename(tmpfile, filename, LOG);
647+
/* Parallel (re)writing into a file haven't happen. */
648+
(void) durable_rename(tmpfile, filename, PANIC);
625649
elog(LOG, "[AQO] %d records stored in file %s.", counter, filename);
626650
return 0;
627651

@@ -839,7 +863,7 @@ aqo_queries_load(void)
839863

840864
LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE);
841865

842-
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
866+
/* Load on postmaster startup. So no any concurrent actions possible here. */
843867
Assert(hash_get_num_entries(queries_htab) == 0);
844868

845869
data_load(PGAQO_QUERIES_FILE, _deform_queries_record_cb, NULL);
@@ -926,6 +950,9 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
926950
static void
927951
on_shmem_shutdown(int code, Datum arg)
928952
{
953+
/*
954+
* XXX: It can be expensive to rewrite a file on each shutdown of a backend.
955+
*/
929956
aqo_qtexts_flush();
930957
aqo_data_flush();
931958
}
@@ -1201,6 +1228,7 @@ _aqo_data_remove(data_key *key)
12011228

12021229
if (hash_search(data_htab, key, HASH_REMOVE, NULL) == NULL)
12031230
elog(PANIC, "[AQO] Inconsistent data hash table");
1231+
12041232
aqo_state->data_changed = true;
12051233
}
12061234

@@ -1270,8 +1298,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
12701298
char *ptr;
12711299
ListCell *lc;
12721300
size_t size;
1273-
bool tblOverflow;
1274-
HASHACTION action;
1301+
bool tblOverflow;
1302+
HASHACTION action;
1303+
bool result;
12751304

12761305
Assert(!LWLockHeldByMe(&aqo_state->data_lock));
12771306

@@ -1387,8 +1416,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
13871416

13881417
aqo_state->data_changed = true;
13891418
end:
1419+
result = aqo_state->data_changed;
13901420
LWLockRelease(&aqo_state->data_lock);
1391-
return aqo_state->data_changed;
1421+
return result;
13921422
}
13931423

13941424
static void
@@ -1496,7 +1526,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
14961526

14971527
dsa_init();
14981528

1499-
LWLockAcquire(&aqo_state->data_lock, LW_EXCLUSIVE);
1529+
LWLockAcquire(&aqo_state->data_lock, LW_SHARED);
15001530

15011531
if (!wideSearch)
15021532
{
@@ -1631,7 +1661,8 @@ aqo_data(PG_FUNCTION_ARGS)
16311661
ptr += sizeof(data_key);
16321662

16331663
if (entry->cols > 0)
1634-
values[AD_FEATURES] = PointerGetDatum(form_matrix((double *)ptr, entry->rows, entry->cols));
1664+
values[AD_FEATURES] = PointerGetDatum(form_matrix((double *) ptr,
1665+
entry->rows, entry->cols));
16351666
else
16361667
nulls[AD_FEATURES] = true;
16371668

@@ -1719,7 +1750,9 @@ aqo_data_reset(void)
17191750
elog(ERROR, "[AQO] hash table corrupted");
17201751
num_remove++;
17211752
}
1722-
aqo_state->data_changed = true;
1753+
1754+
if (num_remove > 0)
1755+
aqo_state->data_changed = true;
17231756
LWLockRelease(&aqo_state->data_lock);
17241757
if (num_remove != num_entries)
17251758
elog(ERROR, "[AQO] Query ML memory storage is corrupted or parallel access without a lock has detected.");
@@ -1831,6 +1864,7 @@ aqo_queries_store(uint64 queryid,
18311864
entry->use_aqo = use_aqo;
18321865
entry->auto_tuning = auto_tuning;
18331866

1867+
aqo_state->queries_changed = true;
18341868
LWLockRelease(&aqo_state->queries_lock);
18351869
return true;
18361870
}
@@ -1856,7 +1890,10 @@ aqo_queries_reset(void)
18561890
elog(ERROR, "[AQO] hash table corrupted");
18571891
num_remove++;
18581892
}
1859-
aqo_state->queries_changed = true;
1893+
1894+
if (num_remove > 0)
1895+
aqo_state->queries_changed = true;
1896+
18601897
LWLockRelease(&aqo_state->queries_lock);
18611898

18621899
if (num_remove != num_entries - 1)

0 commit comments

Comments
 (0)