Skip to content

Commit c65ab0b

Browse files
committed
Recent changes in memory management in tuplesort.c had a problem: the
case where we run low on array slots before we run low on memory is much more probable than I had thought, and so it's important to treat each tape fairly in that case. To fix this, track per-tape slot allocations just like we track per-tape space allocation. Also, in the FINALMERGE code path avoid scanning all the input tapes when we really only need to read from one. This should fix poor behavior with very large work_mem as exhibited by Stefan Kaltenbrunner. I didn't do anything about putting an upper bound on the number of tapes, but maybe we should still consider that.
1 parent 9f61924 commit c65ab0b

File tree

1 file changed

+102
-82
lines changed

1 file changed

+102
-82
lines changed

src/backend/utils/sort/tuplesort.c

+102-82
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
* Portions Copyright (c) 1994, Regents of the University of California
9292
*
9393
* IDENTIFICATION
94-
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.64 2006/03/08 16:59:03 tgl Exp $
94+
* $PostgreSQL: pgsql/src/backend/utils/sort/tuplesort.c,v 1.65 2006/03/10 23:19:00 tgl Exp $
9595
*
9696
*-------------------------------------------------------------------------
9797
*/
@@ -266,24 +266,22 @@ struct Tuplesortstate
266266
* have not yet exhausted that run. mergenext[i] is the memtuples index
267267
* of the next pre-read tuple (next to be loaded into the heap) for tape
268268
* i, or 0 if we are out of pre-read tuples. mergelast[i] similarly
269-
* points to the last pre-read tuple from each tape. mergeavailmem[i] is
270-
* the amount of unused space allocated for tape i. mergefreelist and
271-
* mergefirstfree keep track of unused locations in the memtuples[] array.
272-
* The memtuples[].tupindex fields link together pre-read tuples for each
273-
* tape as well as recycled locations in mergefreelist. It is OK to use 0
274-
* as a null link in these lists, because memtuples[0] is part of the
275-
* merge heap and is never a pre-read tuple. mergeslotsfree counts the
276-
* total number of free memtuples[] slots, both those in the freelist and
277-
* those beyond mergefirstfree.
269+
* points to the last pre-read tuple from each tape. mergeavailslots[i]
270+
* is the number of unused memtuples[] slots reserved for tape i, and
271+
* mergeavailmem[i] is the amount of unused space allocated for tape i.
272+
* mergefreelist and mergefirstfree keep track of unused locations in the
273+
* memtuples[] array. The memtuples[].tupindex fields link together
274+
* pre-read tuples for each tape as well as recycled locations in
275+
* mergefreelist. It is OK to use 0 as a null link in these lists, because
276+
* memtuples[0] is part of the merge heap and is never a pre-read tuple.
278277
*/
279-
bool *mergeactive; /* Active input run source? */
278+
bool *mergeactive; /* active input run source? */
280279
int *mergenext; /* first preread tuple for each source */
281280
int *mergelast; /* last preread tuple for each source */
282-
long *mergeavailmem; /* availMem for prereading tapes */
283-
long spacePerTape; /* actual per-tape target usage */
281+
int *mergeavailslots; /* slots left for prereading each tape */
282+
long *mergeavailmem; /* availMem for prereading each tape */
284283
int mergefreelist; /* head of freelist of recycled slots */
285284
int mergefirstfree; /* first slot never used in this merge */
286-
int mergeslotsfree; /* number of free slots during merge */
287285

288286
/*
289287
* Variables for Algorithm D. Note that destTape is a "logical" tape
@@ -406,6 +404,7 @@ static void mergeruns(Tuplesortstate *state);
406404
static void mergeonerun(Tuplesortstate *state);
407405
static void beginmerge(Tuplesortstate *state);
408406
static void mergepreread(Tuplesortstate *state);
407+
static void mergeprereadone(Tuplesortstate *state, int srcTape);
409408
static void dumptuples(Tuplesortstate *state, bool alltuples);
410409
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
411410
int tupleindex, bool checkIndex);
@@ -1118,8 +1117,11 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
11181117
{
11191118
/*
11201119
* out of preloaded data on this tape, try to read more
1120+
*
1121+
* Unlike mergeonerun(), we only preload from the single
1122+
* tape that's run dry. See mergepreread() comments.
11211123
*/
1122-
mergepreread(state);
1124+
mergeprereadone(state, srcTape);
11231125

11241126
/*
11251127
* if still no data, we've reached end of run on this tape
@@ -1136,7 +1138,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
11361138
/* put the now-unused memtuples entry on the freelist */
11371139
newtup->tupindex = state->mergefreelist;
11381140
state->mergefreelist = tupIndex;
1139-
state->mergeslotsfree++;
1141+
state->mergeavailslots[srcTape]++;
11401142
return true;
11411143
}
11421144
return false;
@@ -1290,6 +1292,7 @@ inittapes(Tuplesortstate *state)
12901292
state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
12911293
state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
12921294
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
1295+
state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
12931296
state->mergeavailmem = (long *) palloc0(maxTapes * sizeof(long));
12941297
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
12951298
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
@@ -1561,7 +1564,7 @@ mergeonerun(Tuplesortstate *state)
15611564
/* put the now-unused memtuples entry on the freelist */
15621565
tup->tupindex = state->mergefreelist;
15631566
state->mergefreelist = tupIndex;
1564-
state->mergeslotsfree++;
1567+
state->mergeavailslots[srcTape]++;
15651568
}
15661569

15671570
/*
@@ -1592,21 +1595,15 @@ beginmerge(Tuplesortstate *state)
15921595
int activeTapes;
15931596
int tapenum;
15941597
int srcTape;
1598+
int slotsPerTape;
1599+
long spacePerTape;
15951600

15961601
/* Heap should be empty here */
15971602
Assert(state->memtupcount == 0);
15981603

1599-
/* Clear merge-pass state variables */
1600-
memset(state->mergeactive, 0, state->maxTapes * sizeof(*state->mergeactive));
1601-
memset(state->mergenext, 0, state->maxTapes * sizeof(*state->mergenext));
1602-
memset(state->mergelast, 0, state->maxTapes * sizeof(*state->mergelast));
1603-
memset(state->mergeavailmem, 0, state->maxTapes * sizeof(*state->mergeavailmem));
1604-
state->mergefreelist = 0; /* nothing in the freelist */
1605-
state->mergefirstfree = state->maxTapes; /* 1st slot avail for preread */
1606-
state->mergeslotsfree = state->memtupsize - state->mergefirstfree;
1607-
Assert(state->mergeslotsfree >= state->maxTapes);
1608-
16091604
/* Adjust run counts and mark the active tapes */
1605+
memset(state->mergeactive, 0,
1606+
state->maxTapes * sizeof(*state->mergeactive));
16101607
activeTapes = 0;
16111608
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
16121609
{
@@ -1623,16 +1620,29 @@ beginmerge(Tuplesortstate *state)
16231620
}
16241621
state->activeTapes = activeTapes;
16251622

1623+
/* Clear merge-pass state variables */
1624+
memset(state->mergenext, 0,
1625+
state->maxTapes * sizeof(*state->mergenext));
1626+
memset(state->mergelast, 0,
1627+
state->maxTapes * sizeof(*state->mergelast));
1628+
state->mergefreelist = 0; /* nothing in the freelist */
1629+
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
1630+
16261631
/*
16271632
* Initialize space allocation to let each active input tape have an equal
16281633
* share of preread space.
16291634
*/
16301635
Assert(activeTapes > 0);
1631-
state->spacePerTape = state->availMem / activeTapes;
1636+
slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
1637+
Assert(slotsPerTape > 0);
1638+
spacePerTape = state->availMem / activeTapes;
16321639
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
16331640
{
16341641
if (state->mergeactive[srcTape])
1635-
state->mergeavailmem[srcTape] = state->spacePerTape;
1642+
{
1643+
state->mergeavailslots[srcTape] = slotsPerTape;
1644+
state->mergeavailmem[srcTape] = spacePerTape;
1645+
}
16361646
}
16371647

16381648
/*
@@ -1657,7 +1667,7 @@ beginmerge(Tuplesortstate *state)
16571667
/* put the now-unused memtuples entry on the freelist */
16581668
tup->tupindex = state->mergefreelist;
16591669
state->mergefreelist = tupIndex;
1660-
state->mergeslotsfree++;
1670+
state->mergeavailslots[srcTape]++;
16611671
}
16621672
}
16631673
}
@@ -1670,73 +1680,83 @@ beginmerge(Tuplesortstate *state)
16701680
* active source tape until the tape's run is exhausted or it has used up
16711681
* its fair share of available memory. In any case, we guarantee that there
16721682
* is at least one preread tuple available from each unexhausted input tape.
1683+
*
1684+
* We invoke this routine at the start of a merge pass for initial load,
1685+
* and then whenever any tape's preread data runs out. Note that we load
1686+
* as much data as possible from all tapes, not just the one that ran out.
1687+
* This is because logtape.c works best with a usage pattern that alternates
1688+
* between reading a lot of data and writing a lot of data, so whenever we
1689+
* are forced to read, we should fill working memory completely.
1690+
*
1691+
* In FINALMERGE state, we *don't* use this routine, but instead just preread
1692+
* from the single tape that ran dry. There's no read/write alternation in
1693+
* that state and so no point in scanning through all the tapes to fix one.
1694+
* (Moreover, there may be quite a lot of inactive tapes in that state, since
1695+
* we might have had many fewer runs than tapes. In a regular tape-to-tape
1696+
* merge we can expect most of the tapes to be active.)
16731697
*/
16741698
static void
16751699
mergepreread(Tuplesortstate *state)
16761700
{
16771701
int srcTape;
1702+
1703+
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1704+
mergeprereadone(state, srcTape);
1705+
}
1706+
1707+
/*
1708+
* mergeprereadone - load tuples from one merge input tape
1709+
*
1710+
* Read tuples from the specified tape until it has used up its free memory
1711+
* or array slots; but ensure that we have at least one tuple, if any are
1712+
* to be had.
1713+
*/
1714+
static void
1715+
mergeprereadone(Tuplesortstate *state, int srcTape)
1716+
{
16781717
unsigned int tuplen;
16791718
SortTuple stup;
16801719
int tupIndex;
16811720
long priorAvail,
16821721
spaceUsed;
16831722

1684-
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
1723+
if (!state->mergeactive[srcTape])
1724+
return; /* tape's run is already exhausted */
1725+
priorAvail = state->availMem;
1726+
state->availMem = state->mergeavailmem[srcTape];
1727+
while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
1728+
state->mergenext[srcTape] == 0)
16851729
{
1686-
if (!state->mergeactive[srcTape])
1687-
continue;
1688-
1689-
/*
1690-
* Skip reading from any tape that still has at least half of its
1691-
* target memory filled with tuples (threshold fraction may need
1692-
* adjustment?). This avoids reading just a few tuples when the
1693-
* incoming runs are not being consumed evenly.
1694-
*/
1695-
if (state->mergenext[srcTape] != 0 &&
1696-
state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
1697-
continue;
1698-
1699-
/*
1700-
* Read tuples from this tape until it has used up its free memory,
1701-
* or we are low on memtuples slots; but ensure that we have at least
1702-
* one tuple.
1703-
*/
1704-
priorAvail = state->availMem;
1705-
state->availMem = state->mergeavailmem[srcTape];
1706-
while ((!LACKMEM(state) && state->mergeslotsfree > state->tapeRange) ||
1707-
state->mergenext[srcTape] == 0)
1730+
/* read next tuple, if any */
1731+
if ((tuplen = getlen(state, srcTape, true)) == 0)
17081732
{
1709-
/* read next tuple, if any */
1710-
if ((tuplen = getlen(state, srcTape, true)) == 0)
1711-
{
1712-
state->mergeactive[srcTape] = false;
1713-
break;
1714-
}
1715-
READTUP(state, &stup, srcTape, tuplen);
1716-
/* find a free slot in memtuples[] for it */
1717-
tupIndex = state->mergefreelist;
1718-
if (tupIndex)
1719-
state->mergefreelist = state->memtuples[tupIndex].tupindex;
1720-
else
1721-
{
1722-
tupIndex = state->mergefirstfree++;
1723-
Assert(tupIndex < state->memtupsize);
1724-
}
1725-
state->mergeslotsfree--;
1726-
/* store tuple, append to list for its tape */
1727-
stup.tupindex = 0;
1728-
state->memtuples[tupIndex] = stup;
1729-
if (state->mergelast[srcTape])
1730-
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
1731-
else
1732-
state->mergenext[srcTape] = tupIndex;
1733-
state->mergelast[srcTape] = tupIndex;
1733+
state->mergeactive[srcTape] = false;
1734+
break;
17341735
}
1735-
/* update per-tape and global availmem counts */
1736-
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1737-
state->mergeavailmem[srcTape] = state->availMem;
1738-
state->availMem = priorAvail - spaceUsed;
1736+
READTUP(state, &stup, srcTape, tuplen);
1737+
/* find a free slot in memtuples[] for it */
1738+
tupIndex = state->mergefreelist;
1739+
if (tupIndex)
1740+
state->mergefreelist = state->memtuples[tupIndex].tupindex;
1741+
else
1742+
{
1743+
tupIndex = state->mergefirstfree++;
1744+
Assert(tupIndex < state->memtupsize);
1745+
}
1746+
state->mergeavailslots[srcTape]--;
1747+
/* store tuple, append to list for its tape */
1748+
stup.tupindex = 0;
1749+
state->memtuples[tupIndex] = stup;
1750+
if (state->mergelast[srcTape])
1751+
state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex;
1752+
else
1753+
state->mergenext[srcTape] = tupIndex;
1754+
state->mergelast[srcTape] = tupIndex;
17391755
}
1756+
/* update per-tape and global availmem counts */
1757+
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
1758+
state->mergeavailmem[srcTape] = state->availMem;
1759+
state->availMem = priorAvail - spaceUsed;
17401760
}
17411761

17421762
/*

0 commit comments

Comments
 (0)