Skip to content

Commit

Permalink
wip: splitme: variable per-io memory for pgsr
Browse files Browse the repository at this point in the history
Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
  • Loading branch information
anarazel committed Dec 14, 2023
1 parent 70a3e1b commit a08cd71
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 94 deletions.
25 changes: 14 additions & 11 deletions contrib/pg_prewarm/pg_prewarm.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ typedef struct prefetch
} prefetch;

static BlockNumber
prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
prewarm_buffer_next(PgStreamingRead *pgsr,
uintptr_t pgsr_private, void *io_private,
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
{
prefetch *p = (prefetch *) pgsr_private;
Expand All @@ -72,7 +73,7 @@ prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,

static PgStreamingReadNextStatus
prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
PgAioInProgress *aio, uintptr_t *read_private)
PgAioInProgress *aio, void *read_private)
{
prefetch *p = (prefetch *) pgsr_private;
BlockNumber blockno;
Expand All @@ -96,13 +97,13 @@ prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
pgaio_io_start_read_smgr(aio, p->rel->rd_smgr, p->forkNumber, blockno,
pgaio_bounce_buffer_buffer(bb));

*read_private = (uintptr_t) bb;
*(void **) read_private = bb;

return PGSR_NEXT_IO;
}

static void
prewarm_smgr_release(uintptr_t pgsr_private, uintptr_t read_private)
prewarm_smgr_release(uintptr_t pgsr_private, void *read_private)
{
}

Expand Down Expand Up @@ -279,7 +280,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
p.lastblock = last_block;
p.bbs = NIL;

pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &p,
pgsr = pg_streaming_read_buffer_alloc(512, 0, (uintptr_t) &p,
NULL,
prewarm_buffer_next);

Expand All @@ -289,7 +290,7 @@ pg_prewarm(PG_FUNCTION_ARGS)

CHECK_FOR_INTERRUPTS();

buf = (Buffer) pg_streaming_read_get_next(pgsr);
buf = (Buffer) pg_streaming_read_buffer_get_next(pgsr, NULL);
if (BufferIsValid(buf))
ReleaseBuffer(buf);
else
Expand All @@ -298,7 +299,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
++blocks_done;
}

if (BufferIsValid(pg_streaming_read_get_next(pgsr)))
if (BufferIsValid(pg_streaming_read_buffer_get_next(pgsr, NULL)))
elog(ERROR, "unexpected additional buffer");

pg_streaming_read_free(pgsr);
Expand All @@ -315,21 +316,23 @@ pg_prewarm(PG_FUNCTION_ARGS)
p.lastblock = last_block;
p.bbs = NIL;

pgsr = pg_streaming_read_alloc(512, (uintptr_t) &p,
pgsr = pg_streaming_read_alloc(512,
sizeof(void *),
(uintptr_t) &p,
prewarm_smgr_next,
prewarm_smgr_release);

for (block = first_block; block <= last_block; ++block)
{
PgAioBounceBuffer *bb;
PgAioBounceBuffer **bb;

CHECK_FOR_INTERRUPTS();

bb = (PgAioBounceBuffer *) pg_streaming_read_get_next(pgsr);
bb = (PgAioBounceBuffer **) pg_streaming_read_get_next(pgsr);
if (bb == NULL)
elog(ERROR, "prefetch ended early");

p.bbs = lappend(p.bbs, (void *) bb);
p.bbs = lappend(p.bbs, (void *) *bb);

++blocks_done;
}
Expand Down
16 changes: 10 additions & 6 deletions src/backend/access/heap/heapam.c
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
*/

static BlockNumber
heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private,
heap_pgsr_next_single(PgStreamingRead *pgsr,
uintptr_t pgsr_private, void *io_private,
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
{
HeapScanDesc scan = (HeapScanDesc) pgsr_private;
Expand Down Expand Up @@ -268,7 +269,8 @@ heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private,
}

static BlockNumber
heap_pgsr_next_parallel(PgStreamingRead *pgsr, uintptr_t pgsr_private,
heap_pgsr_next_parallel(PgStreamingRead *pgsr,
uintptr_t pgsr_private, void *io_private,
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
{
HeapScanDesc scan = (HeapScanDesc) pgsr_private;
Expand All @@ -294,7 +296,8 @@ heap_pgsr_single_alloc(HeapScanDesc scan)
{
int iodepth = Max(Min(512, NBuffers / 128), 1);

return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan,
return pg_streaming_read_buffer_alloc(iodepth, 0,
(uintptr_t) scan,
scan->rs_strategy,
heap_pgsr_next_single);
}
Expand All @@ -304,7 +307,8 @@ heap_pgsr_parallel_alloc(HeapScanDesc scan)
{
int iodepth = Max(Min(512, NBuffers / 128), 1);

return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan,
return pg_streaming_read_buffer_alloc(iodepth, 0,
(uintptr_t) scan,
scan->rs_strategy,
heap_pgsr_next_parallel);
}
Expand Down Expand Up @@ -624,7 +628,7 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir, Buffer *pgsr_buf)
/* FIXME: Integrate more neatly */
if (scan->pgsr)
{
*pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
if (*pgsr_buf == InvalidBuffer)
return InvalidBlockNumber;
return BufferGetBlockNumber(*pgsr_buf);
Expand Down Expand Up @@ -784,7 +788,7 @@ heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir
block = InvalidBlockNumber;
}
#endif
*pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
if (*pgsr_buf == InvalidBuffer)
return InvalidBlockNumber;
Assert(scan->rs_base.rs_parallel ||
Expand Down
67 changes: 29 additions & 38 deletions src/backend/access/heap/vacuumlazy.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
}

static BlockNumber
vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
vacuum_scan_pgsr_next(PgStreamingRead *pgsr,
uintptr_t pgsr_private, void *io_private,
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
{
LVRelState *vacrel = (LVRelState *) pgsr_private;
Expand Down Expand Up @@ -847,15 +848,6 @@ vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
return InvalidBlockNumber;
}

static void
vacuum_pgsr_release(uintptr_t pgsr_private, uintptr_t read_private)
{
Buffer buf = (Buffer) read_private;

Assert(BufferIsValid(buf));
ReleaseBuffer(buf);
}

/*
* lazy_scan_heap() -- workhorse function for VACUUM
*
Expand Down Expand Up @@ -909,7 +901,8 @@ lazy_scan_heap(LVRelState *vacrel)
{
int iodepth = Max(Min(128, NBuffers / 128), 1);

pgsr = pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) vacrel,
pgsr = pg_streaming_read_buffer_alloc(iodepth, 0,
(uintptr_t) vacrel,
vacrel->bstrategy,
vacuum_scan_pgsr_next);
}
Expand All @@ -936,7 +929,7 @@ lazy_scan_heap(LVRelState *vacrel)
bool all_visible_according_to_vm = false;
LVPagePruneState prunestate;

buf = pg_streaming_read_get_next(pgsr);
buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
if (!BufferIsValid(buf))
break;

Expand Down Expand Up @@ -2505,7 +2498,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
typedef struct VacuumHeapBlockState
{
BlockNumber blkno;
Buffer buffer;
int start_tupindex;
int end_tupindex;
} VacuumHeapBlockState;
Expand All @@ -2518,19 +2510,19 @@ typedef struct VacuumHeapState
int next_tupindex;
} VacuumHeapState;

static PgStreamingReadNextStatus
vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
PgAioInProgress *aio, uintptr_t *read_private)
static BlockNumber
vacuum_heap_pgsr_next(PgStreamingRead *pgsr,
uintptr_t pgsr_private,
void *io_private,
struct RelationData **rel, ForkNumber *forkNum, ReadBufferMode *mode)
{
VacuumHeapState *vhs = (VacuumHeapState *) pgsr_private;
VacDeadItems *dead_items = vhs->vacrel->dead_items;
VacuumHeapBlockState *bs;
bool already_valid;
VacuumHeapBlockState *bs = io_private;

if (vhs->next_tupindex == dead_items->num_items)
return PGSR_NEXT_END;
return InvalidBlockNumber;

bs = palloc0(sizeof(*bs));
bs->blkno = ItemPointerGetBlockNumber(&dead_items->items[vhs->next_tupindex]);
bs->start_tupindex = vhs->next_tupindex;
bs->end_tupindex = vhs->next_tupindex;
Expand All @@ -2544,15 +2536,10 @@ vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
bs->end_tupindex = vhs->next_tupindex;
}

bs->buffer = ReadBufferAsync(vhs->relation, MAIN_FORKNUM, bs->blkno,
RBM_NORMAL, vhs->vacrel->bstrategy, &already_valid,
&aio);
*read_private = (uintptr_t) bs;

if (already_valid)
return PGSR_NEXT_NO_IO;
else
return PGSR_NEXT_IO;
*rel = vhs->relation;
*forkNum = MAIN_FORKNUM;
*mode = RBM_NORMAL;
return bs->blkno;
}

/*
Expand Down Expand Up @@ -2600,16 +2587,20 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
vhs.vacrel = vacrel;
vhs.last_block = InvalidBlockNumber;
vhs.next_tupindex = 0;
pgsr = pg_streaming_read_alloc(512, (uintptr_t) &vhs,
vacuum_heap_pgsr_next,
vacuum_pgsr_release);
pgsr = pg_streaming_read_buffer_alloc(512,
sizeof(VacuumHeapBlockState),
(uintptr_t) &vhs,
vacrel->bstrategy,
vacuum_heap_pgsr_next);
while (true)
{
VacuumHeapBlockState *bs = (VacuumHeapBlockState *) pg_streaming_read_get_next(pgsr);
VacuumHeapBlockState *bs;
Page page;
Size freespace;
Buffer buffer;

if (bs == NULL)
buffer = pg_streaming_read_buffer_get_next(pgsr, (void **) &bs);
if (!BufferIsValid(buffer))
break;

Assert(bs->start_tupindex == index);
Expand All @@ -2623,15 +2614,15 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
visibilitymap_pin(vacrel->rel, bs->blkno, &vmbuffer);

/* We need a non-cleanup exclusive lock to mark dead_items unused */
LockBuffer(bs->buffer, BUFFER_LOCK_EXCLUSIVE);
index = lazy_vacuum_heap_page(vacrel, bs->blkno, bs->buffer, index,
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
index = lazy_vacuum_heap_page(vacrel, bs->blkno, buffer, index,
vmbuffer);

/* Now that we've compacted the page, record its available space */
page = BufferGetPage(bs->buffer);
page = BufferGetPage(buffer);
freespace = PageGetHeapFreeSpace(page);

UnlockReleaseBuffer(bs->buffer);
UnlockReleaseBuffer(buffer);
RecordPageWithFreeSpace(vacrel->rel, bs->blkno, freespace);
vacuumed_pages++;

Expand Down
6 changes: 4 additions & 2 deletions src/backend/access/nbtree/nbtree.c
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)

static BlockNumber
btvacuumscan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
void *io_private,
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
{
BTVacState *vstate = (BTVacState *) pgsr_private;
Expand Down Expand Up @@ -1030,14 +1031,15 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
vstate.nextblock = scanblkno;
vstate.num_pages = num_pages;

pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &vstate,
pgsr = pg_streaming_read_buffer_alloc(512, 0,
(uintptr_t) &vstate,
vstate.info->strategy,
btvacuumscan_pgsr_next);

/* Iterate over pages, then loop back to recheck length */
for (; scanblkno < num_pages; scanblkno++)
{
Buffer buf = pg_streaming_read_get_next(pgsr);
Buffer buf = pg_streaming_read_buffer_get_next(pgsr, NULL);

Assert(BufferIsValid(buf));
Assert(BufferGetBlockNumber(buf) == scanblkno);
Expand Down
Loading

0 comments on commit a08cd71

Please sign in to comment.