Skip to content

Commit 6a8aefc

Browse files
committed
Fix race condition
1 parent e7741b9 commit 6a8aefc

File tree

4 files changed

+41
-27
lines changed

4 files changed

+41
-27
lines changed

bench/bench.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,28 @@ def cwd(path):
3030
os.chdir(curdir)
3131

3232

33-
async def insert_data(con, files, table_name, table_name_c):
34-
while True:
35-
try:
36-
filename = files.pop()
33+
async def insert_data(node, files, table_name, table_name_c):
34+
sql = 'insert into {} values ($1)'
35+
sql1 = sql.format(table_name)
36+
sql2 = sql.format(table_name_c)
37+
38+
async with node.connect() as con:
39+
while True:
40+
try:
41+
filename = files.pop()
42+
except IndexError:
43+
break
44+
3745
print(filename)
38-
except IndexError:
39-
break
4046

41-
with open(filename, 'r') as f:
42-
data = json.load(f)
47+
with open(filename, 'r') as f:
48+
data = json.load(f)
4349

44-
if isinstance(data, dict):
45-
if 'rounds' in data:
46-
for obj in data['rounds']:
47-
sql = 'insert into {} values ($1)'
48-
for i in range(100):
49-
await con.execute(sql.format(table_name), json.dumps(obj))
50-
await con.execute(sql.format(table_name_c), json.dumps(obj))
50+
if isinstance(data, dict):
51+
if 'rounds' in data:
52+
for obj in data['rounds']:
53+
await con.execute(sql1, json.dumps(obj))
54+
await con.execute(sql2, json.dumps(obj))
5155

5256

5357
def main(loop):
@@ -58,8 +62,6 @@ def main(loop):
5862

5963
node.safe_psql('postgres', 'create extension jsonbd')
6064

61-
connections = [node.connect() for i in range(3)]
62-
6365
for name, root_dir in sources:
6466
table_name = name
6567
table_name_c = '%s_c' % name
@@ -68,20 +70,21 @@ def main(loop):
6870
node.safe_psql('postgres', 'create table %s(a jsonb compression jsonbd)' % table_name_c)
6971
node.safe_psql('postgres', 'alter table %s alter column a set storage external' % table_name)
7072

71-
files = []
7273
with cwd(os.path.abspath(root_dir)):
74+
files = []
7375
for filename in glob.iglob('**/*.json', recursive=True):
7476
if filename == 'package.json':
7577
continue
7678

7779
files.append(filename)
7880

79-
coroutines = [insert_data(con, files, table_name, table_name_c)
80-
for con in connections]
81+
print(len(files))
82+
coroutines = [insert_data(node, files, table_name, table_name_c)
83+
for i in range(4)]
8184
loop.run_until_complete(asyncio.gather(*coroutines))
8285

83-
print(node.safe_psql('postgres', "select pg_size_pretty(pg_total_relation_size('%s'))" % table_name))
84-
print(node.safe_psql('postgres', "select pg_size_pretty(pg_total_relation_size('%s'))" % table_name_c))
86+
print(node.safe_psql('postgres', "select pg_size_pretty(pg_total_relation_size('%s'))" % table_name))
87+
print(node.safe_psql('postgres', "select pg_size_pretty(pg_total_relation_size('%s'))" % table_name_c))
8588

8689

8790
if __name__ == '__main__':

jsonbd.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
304304

305305
hdr = shm_toc_lookup(toc, 0, false);
306306

307+
begin:
307308
/*
308309
* find some not busy worker,
309310
* the backend can intercept a worker that just started by another
@@ -398,6 +399,17 @@ jsonbd_communicate(shm_mq_iovec *iov, int iov_len,
398399

399400
comm:
400401
Assert(wd != NULL);
402+
Assert(LWLockHeldByMe(wd->lock));
403+
404+
/*
405+
* Even if we got the lock it doesn't mean that worker is free,
406+
* so try to set busy flag
407+
*/
408+
if (!pg_atomic_test_set_flag(&wd->busy))
409+
{
410+
LWLockRelease(wd->lock);
411+
goto begin;
412+
}
401413

402414
detached = false;
403415

jsonbd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ typedef struct jsonbd_shm_worker
2929
volatile Oid dboid; /* database of the worker */
3030
LWLock *lock;
3131
Latch latch;
32+
pg_atomic_flag busy; /* worker is busy */
3233
} jsonbd_shm_worker;
3334

3435
/* Shared memory structures */

jsonbd_worker.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ init_worker(dsm_segment *seg)
161161
/* Set launcher free */
162162
SetLatch(&hdr->launcher_latch);
163163
InitLatch(&worker_state->latch);
164+
pg_atomic_init_flag(&worker_state->busy);
164165

165166
/* make this worker visible in backend cycle */
166167
hdr->workers_ready++;
@@ -778,12 +779,9 @@ jsonbd_worker_main(Datum arg)
778779
if (resmq != SHM_MQ_SUCCESS)
779780
elog(NOTICE, "jsonbd: backend detached early");
780781

781-
/*
782-
* it is not safe call shm_mq_detach here, since mq can be
783-
* already cleared.
784-
* */
785-
/* shm_mq_detach(mqh); */
782+
shm_mq_detach(mqh);
786783
MemoryContextReset(worker_context);
784+
pg_atomic_clear_flag(&worker_state->busy);
787785
}
788786
}
789787

0 commit comments

Comments
 (0)