From a08cd717b5af4e51afb25ec86623973158a72ab9 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 15 Aug 2023 20:01:07 -0700 Subject: [PATCH] wip: splitme: variable per-io memory for pgsr Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- contrib/pg_prewarm/pg_prewarm.c | 25 +++-- src/backend/access/heap/heapam.c | 16 +-- src/backend/access/heap/vacuumlazy.c | 67 +++++------- src/backend/access/nbtree/nbtree.c | 6 +- src/backend/storage/aio/streaming_read.c | 130 +++++++++++++++++------ src/include/storage/streaming_read.h | 16 ++- 6 files changed, 166 insertions(+), 94 deletions(-) diff --git a/contrib/pg_prewarm/pg_prewarm.c b/contrib/pg_prewarm/pg_prewarm.c index f91c493e3fbd5..16d5250a01958 100644 --- a/contrib/pg_prewarm/pg_prewarm.c +++ b/contrib/pg_prewarm/pg_prewarm.c @@ -53,7 +53,8 @@ typedef struct prefetch } prefetch; static BlockNumber -prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, +prewarm_buffer_next(PgStreamingRead *pgsr, + uintptr_t pgsr_private, void *io_private, Relation *rel, ForkNumber *fork, ReadBufferMode *mode) { prefetch *p = (prefetch *) pgsr_private; @@ -72,7 +73,7 @@ prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, static PgStreamingReadNextStatus prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, - PgAioInProgress *aio, uintptr_t *read_private) + PgAioInProgress *aio, void *read_private) { prefetch *p = (prefetch *) pgsr_private; BlockNumber blockno; @@ -96,13 +97,13 @@ prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, pgaio_io_start_read_smgr(aio, p->rel->rd_smgr, p->forkNumber, blockno, pgaio_bounce_buffer_buffer(bb)); - *read_private = (uintptr_t) bb; + *(void **) read_private = bb; return PGSR_NEXT_IO; } static void -prewarm_smgr_release(uintptr_t pgsr_private, uintptr_t read_private) +prewarm_smgr_release(uintptr_t pgsr_private, void *read_private) { } @@ -279,7 +280,7 @@ pg_prewarm(PG_FUNCTION_ARGS) p.lastblock = last_block; p.bbs = NIL; - pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &p, + pgsr = pg_streaming_read_buffer_alloc(512, 0, (uintptr_t) &p, NULL, prewarm_buffer_next); @@ -289,7 +290,7 @@ pg_prewarm(PG_FUNCTION_ARGS) CHECK_FOR_INTERRUPTS(); - buf = (Buffer) pg_streaming_read_get_next(pgsr); + buf = (Buffer) pg_streaming_read_buffer_get_next(pgsr, NULL); if (BufferIsValid(buf)) ReleaseBuffer(buf); else @@ -298,7 +299,7 @@ pg_prewarm(PG_FUNCTION_ARGS) ++blocks_done; } - if (BufferIsValid(pg_streaming_read_get_next(pgsr))) + if (BufferIsValid(pg_streaming_read_buffer_get_next(pgsr, NULL))) elog(ERROR, "unexpected additional buffer"); pg_streaming_read_free(pgsr); @@ -315,21 +316,23 @@ pg_prewarm(PG_FUNCTION_ARGS) p.lastblock = last_block; p.bbs = NIL; - pgsr = pg_streaming_read_alloc(512, (uintptr_t) &p, + pgsr = pg_streaming_read_alloc(512, + sizeof(void *), + (uintptr_t) &p, prewarm_smgr_next, prewarm_smgr_release); for (block = first_block; block <= last_block; ++block) { - PgAioBounceBuffer *bb; + PgAioBounceBuffer **bb; CHECK_FOR_INTERRUPTS(); - bb = (PgAioBounceBuffer *) pg_streaming_read_get_next(pgsr); + bb = (PgAioBounceBuffer **) pg_streaming_read_get_next(pgsr); if (bb == NULL) elog(ERROR, "prefetch ended early"); - p.bbs = lappend(p.bbs, (void *) bb); + p.bbs = lappend(p.bbs, (void *) *bb); ++blocks_done; } diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 27d25053d559a..7d0a2c70a0c75 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -227,7 +227,8 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = */ static BlockNumber -heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private, +heap_pgsr_next_single(PgStreamingRead *pgsr, + uintptr_t pgsr_private, void *io_private, Relation *rel, ForkNumber *fork, ReadBufferMode *mode) { HeapScanDesc scan = (HeapScanDesc) pgsr_private; @@ -268,7 +269,8 @@ heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private, } static BlockNumber -heap_pgsr_next_parallel(PgStreamingRead *pgsr, uintptr_t pgsr_private, +heap_pgsr_next_parallel(PgStreamingRead *pgsr, + uintptr_t pgsr_private, void *io_private, Relation *rel, ForkNumber *fork, ReadBufferMode *mode) { HeapScanDesc scan = (HeapScanDesc) pgsr_private; @@ -294,7 +296,8 @@ heap_pgsr_single_alloc(HeapScanDesc scan) { int iodepth = Max(Min(512, NBuffers / 128), 1); - return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan, + return pg_streaming_read_buffer_alloc(iodepth, 0, + (uintptr_t) scan, scan->rs_strategy, heap_pgsr_next_single); } @@ -304,7 +307,8 @@ heap_pgsr_parallel_alloc(HeapScanDesc scan) { int iodepth = Max(Min(512, NBuffers / 128), 1); - return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan, + return pg_streaming_read_buffer_alloc(iodepth, 0, + (uintptr_t) scan, scan->rs_strategy, heap_pgsr_next_parallel); } @@ -624,7 +628,7 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir, Buffer *pgsr_buf) /* FIXME: Integrate more neatly */ if (scan->pgsr) { - *pgsr_buf = pg_streaming_read_get_next(scan->pgsr); + *pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); if (*pgsr_buf == InvalidBuffer) return InvalidBlockNumber; return BufferGetBlockNumber(*pgsr_buf); @@ -784,7 +788,7 @@ heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir block = InvalidBlockNumber; } #endif - *pgsr_buf = pg_streaming_read_get_next(scan->pgsr); + *pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); if (*pgsr_buf == InvalidBuffer) return InvalidBlockNumber; Assert(scan->rs_base.rs_parallel || diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 1999b8829298e..0af5a47af7688 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -799,7 +799,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, } static BlockNumber -vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, +vacuum_scan_pgsr_next(PgStreamingRead *pgsr, + uintptr_t pgsr_private, void *io_private, Relation *rel, ForkNumber *fork, ReadBufferMode *mode) { LVRelState *vacrel = (LVRelState *) pgsr_private; @@ -847,15 +848,6 @@ vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, return InvalidBlockNumber; } -static void -vacuum_pgsr_release(uintptr_t pgsr_private, uintptr_t read_private) -{ - Buffer buf = (Buffer) read_private; - - Assert(BufferIsValid(buf)); - ReleaseBuffer(buf); -} - /* * lazy_scan_heap() -- workhorse function for VACUUM * @@ -909,7 +901,8 @@ lazy_scan_heap(LVRelState *vacrel) { int iodepth = Max(Min(128, NBuffers / 128), 1); - pgsr = pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) vacrel, + pgsr = pg_streaming_read_buffer_alloc(iodepth, 0, + (uintptr_t) vacrel, vacrel->bstrategy, vacuum_scan_pgsr_next); } @@ -936,7 +929,7 @@ lazy_scan_heap(LVRelState *vacrel) bool all_visible_according_to_vm = false; LVPagePruneState prunestate; - buf = pg_streaming_read_get_next(pgsr); + buf = pg_streaming_read_buffer_get_next(pgsr, NULL); if (!BufferIsValid(buf)) break; @@ -2505,7 +2498,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) typedef struct VacuumHeapBlockState { BlockNumber blkno; - Buffer buffer; int start_tupindex; int end_tupindex; } VacuumHeapBlockState; @@ -2518,19 +2510,19 @@ typedef struct VacuumHeapState int next_tupindex; } VacuumHeapState; -static PgStreamingReadNextStatus -vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, - PgAioInProgress *aio, uintptr_t *read_private) +static BlockNumber +vacuum_heap_pgsr_next(PgStreamingRead *pgsr, + uintptr_t pgsr_private, + void *io_private, + struct RelationData **rel, ForkNumber *forkNum, ReadBufferMode *mode) { VacuumHeapState *vhs = (VacuumHeapState *) pgsr_private; VacDeadItems *dead_items = vhs->vacrel->dead_items; - VacuumHeapBlockState *bs; - bool already_valid; + VacuumHeapBlockState *bs = io_private; if (vhs->next_tupindex == dead_items->num_items) - return PGSR_NEXT_END; + return InvalidBlockNumber; - bs = palloc0(sizeof(*bs)); bs->blkno = ItemPointerGetBlockNumber(&dead_items->items[vhs->next_tupindex]); bs->start_tupindex = vhs->next_tupindex; bs->end_tupindex = vhs->next_tupindex; @@ -2544,15 +2536,10 @@ vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, bs->end_tupindex = vhs->next_tupindex; } - bs->buffer = ReadBufferAsync(vhs->relation, MAIN_FORKNUM, bs->blkno, - RBM_NORMAL, vhs->vacrel->bstrategy, &already_valid, - &aio); - *read_private = (uintptr_t) bs; - - if (already_valid) - return PGSR_NEXT_NO_IO; - else - return PGSR_NEXT_IO; + *rel = vhs->relation; + *forkNum = MAIN_FORKNUM; + *mode = RBM_NORMAL; + return bs->blkno; } /* @@ -2600,16 +2587,20 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) vhs.vacrel = vacrel; vhs.last_block = InvalidBlockNumber; vhs.next_tupindex = 0; - pgsr = pg_streaming_read_alloc(512, (uintptr_t) &vhs, - vacuum_heap_pgsr_next, - vacuum_pgsr_release); + pgsr = pg_streaming_read_buffer_alloc(512, + sizeof(VacuumHeapBlockState), + (uintptr_t) &vhs, + vacrel->bstrategy, + vacuum_heap_pgsr_next); while (true) { - VacuumHeapBlockState *bs = (VacuumHeapBlockState *) pg_streaming_read_get_next(pgsr); + VacuumHeapBlockState *bs; Page page; Size freespace; + Buffer buffer; - if (bs == NULL) + buffer = pg_streaming_read_buffer_get_next(pgsr, (void **) &bs); + if (!BufferIsValid(buffer)) break; Assert(bs->start_tupindex == index); @@ -2623,15 +2614,15 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) visibilitymap_pin(vacrel->rel, bs->blkno, &vmbuffer); /* We need a non-cleanup exclusive lock to mark dead_items unused */ - LockBuffer(bs->buffer, BUFFER_LOCK_EXCLUSIVE); - index = lazy_vacuum_heap_page(vacrel, bs->blkno, bs->buffer, index, + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + index = lazy_vacuum_heap_page(vacrel, bs->blkno, buffer, index, vmbuffer); /* Now that we've compacted the page, record its available space */ - page = BufferGetPage(bs->buffer); + page = BufferGetPage(buffer); freespace = PageGetHeapFreeSpace(page); - UnlockReleaseBuffer(bs->buffer); + UnlockReleaseBuffer(buffer); RecordPageWithFreeSpace(vacrel->rel, bs->blkno, freespace); vacuumed_pages++; diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 9a1a4548c0f96..5b2c328d100b8 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -897,6 +897,7 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) static BlockNumber btvacuumscan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, + void *io_private, Relation *rel, ForkNumber *fork, ReadBufferMode *mode) { BTVacState *vstate = (BTVacState *) pgsr_private; @@ -1030,14 +1031,15 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, vstate.nextblock = scanblkno; vstate.num_pages = num_pages; - pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &vstate, + pgsr = pg_streaming_read_buffer_alloc(512, 0, + (uintptr_t) &vstate, vstate.info->strategy, btvacuumscan_pgsr_next); /* Iterate over pages, then loop back to recheck length */ for (; scanblkno < num_pages; scanblkno++) { - Buffer buf = pg_streaming_read_get_next(pgsr); + Buffer buf = pg_streaming_read_buffer_get_next(pgsr, NULL); Assert(BufferIsValid(buf)); Assert(BufferGetBlockNumber(buf) == scanblkno); diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c index 1b469ec1dcf17..5310ff133718e 100644 --- a/src/backend/storage/aio/streaming_read.c +++ b/src/backend/storage/aio/streaming_read.c @@ -35,7 +35,7 @@ typedef struct PgStreamingReadItem bool in_progress; /* is this item currently valid / used */ bool valid; - uintptr_t read_private; + void *io_private; } PgStreamingReadItem; typedef void (*PgStreamingReadLayerFreeCB) (PgStreamingRead *pgsr); @@ -46,6 +46,8 @@ struct PgStreamingRead uint32 distance_max; uint32 all_items_count; + uint32 per_io_private_size; + uintptr_t pgsr_private; PgStreamingReadDetermineNextCB determine_next_cb; PgStreamingReadRelease release_cb; @@ -77,6 +79,13 @@ struct PgStreamingRead /* available reads (unused or completed) */ dlist_head available; + /* + * Last item returned by pg_streaming_read_get_next() et al. Not yet added + * to ->available, as last_returned->io_private might still be used by + * caller. + */ + PgStreamingReadItem *last_returned; + /* * IOs, be they completed or in progress, in the order that the callback * returned them. @@ -89,21 +98,38 @@ struct PgStreamingRead static void pg_streaming_read_complete(PgAioOnCompletionLocalContext *ocb, PgAioInProgress *io); static void pg_streaming_read_prefetch(PgStreamingRead *pgsr); +/* + * Allocates a streaming read instances. + * + * Each IO has per_io_private_size private memory, which can be set in + * determine_next_cb() and returned pg_streaming_read_get_next(). + */ PgStreamingRead * -pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private, +pg_streaming_read_alloc(uint32 iodepth, + uint32 per_io_private_size, uintptr_t pgsr_private, PgStreamingReadDetermineNextCB determine_next_cb, PgStreamingReadRelease release_cb) { PgStreamingRead *pgsr; + uint32 all_items_count; + size_t pgsr_sz, total_sz; + char *p; iodepth = Max(Min(iodepth, NBuffers / 128), 1); + all_items_count = iodepth * 2; + + pgsr_sz = offsetof(PgStreamingRead, all_items) + + all_items_count * sizeof(PgStreamingReadItem); + total_sz = pgsr_sz + + all_items_count * per_io_private_size; - pgsr = palloc0(offsetof(PgStreamingRead, all_items) + - sizeof(PgStreamingReadItem) * iodepth * 2); + p = palloc0(total_sz); + pgsr = (PgStreamingRead *) p; + p += pgsr_sz; pgsr->iodepth_max = iodepth; pgsr->distance_max = iodepth; - pgsr->all_items_count = pgsr->iodepth_max + pgsr->distance_max; + pgsr->all_items_count = all_items_count; pgsr->pgsr_private = pgsr_private; pgsr->determine_next_cb = determine_next_cb; pgsr->release_cb = release_cb; @@ -120,6 +146,9 @@ pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private, this_read->on_completion.callback = pg_streaming_read_complete; this_read->pgsr = pgsr; + this_read->io_private = p; + p += per_io_private_size; + dlist_push_tail(&pgsr->available, &this_read->node); } @@ -148,7 +177,7 @@ pg_streaming_read_free(PgStreamingRead *pgsr) } if (this_read->valid) - pgsr->release_cb(pgsr->pgsr_private, this_read->read_private); + pgsr->release_cb(pgsr->pgsr_private, this_read->io_private); if (this_read->aio) { @@ -207,7 +236,6 @@ pg_streaming_read_prefetch_one(PgStreamingRead *pgsr) this_read = dlist_container(PgStreamingReadItem, node, dlist_pop_head_node(&pgsr->available)); Assert(!this_read->valid); Assert(!this_read->in_progress); - Assert(this_read->read_private == 0); if (this_read->aio == NULL) { @@ -223,14 +251,16 @@ pg_streaming_read_prefetch_one(PgStreamingRead *pgsr) pgsr->prefetched_total_count++; status = pgsr->determine_next_cb(pgsr, pgsr->pgsr_private, - this_read->aio, &this_read->read_private); + this_read->aio, + this_read->io_private); if (status == PGSR_NEXT_END) { pgsr->inflight_count--; pgsr->prefetched_total_count--; pgsr->hit_end = true; - this_read->read_private = 0; + /* FIXME: assert only */ + memset(this_read->io_private, 0x3f, pgsr->per_io_private_size); this_read->valid = false; this_read->in_progress = false; pgaio_io_recycle(this_read->aio); @@ -240,7 +270,6 @@ pg_streaming_read_prefetch_one(PgStreamingRead *pgsr) } else if (status == PGSR_NEXT_NO_IO) { - Assert(this_read->read_private != 0); pgsr->inflight_count--; pgsr->no_io_count++; pgsr->completed_count++; @@ -248,10 +277,6 @@ pg_streaming_read_prefetch_one(PgStreamingRead *pgsr) pgaio_io_recycle(this_read->aio); dlist_delete_from(&pgsr->issued, &this_read->node); } - else - { - Assert(this_read->read_private != 0); - } } static void @@ -321,9 +346,15 @@ pg_streaming_read_prefetch(PgStreamingRead *pgsr) } } -uintptr_t +void * pg_streaming_read_get_next(PgStreamingRead *pgsr) { + if (pgsr->last_returned) + { + dlist_push_tail(&pgsr->available, &pgsr->last_returned->node); + pgsr->last_returned = NULL; + } + if (pgsr->prefetched_total_count == 0) { pg_streaming_read_prefetch(pgsr); @@ -338,7 +369,7 @@ pg_streaming_read_get_next(PgStreamingRead *pgsr) else { PgStreamingReadItem *this_read; - uint64_t ret; + void *ret; Assert(pgsr->prefetched_total_count > 0); @@ -354,13 +385,18 @@ pg_streaming_read_get_next(PgStreamingRead *pgsr) Assert(this_read->valid); } - Assert(this_read->read_private != 0); - ret = this_read->read_private; - this_read->read_private = 0; + /* + * Queue this_read to be reused during the next call to + * pg_streaming_read_get_next(). This is deferred, so that the caller + * can use io_private to stash data without needing per-io memory + * allocations. + */ + pgsr->last_returned = this_read; + + ret = this_read->io_private; this_read->valid = false; pgsr->completed_count--; - dlist_push_tail(&pgsr->available, &this_read->node); pg_streaming_read_prefetch(pgsr); return ret; @@ -374,10 +410,17 @@ typedef struct PgStreamingReadBufferPrivate BufferAccessStrategy strategy; } PgStreamingReadBufferPrivate; +typedef struct PgStreamingReadBufferIOPrivate +{ + Buffer buffer; + char buffer_io_private[FLEXIBLE_ARRAY_MEMBER]; +} PgStreamingReadBufferIOPrivate; + static void -pg_streaming_read_buffer_release(uintptr_t pgsr_private, uintptr_t read_private) +pg_streaming_read_buffer_release(uintptr_t pgsr_private, void *read_private) { - Buffer buf = (Buffer) read_private; + PgStreamingReadBufferIOPrivate *buf_io_priv = read_private; + Buffer buf = buf_io_priv->buffer; Assert(BufferIsValid(buf)); ReleaseBuffer(buf); @@ -385,26 +428,24 @@ pg_streaming_read_buffer_release(uintptr_t pgsr_private, uintptr_t read_private) static PgStreamingReadNextStatus pg_streaming_read_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private, - PgAioInProgress *aio, uintptr_t *read_private) + PgAioInProgress *aio, void *read_private) { PgStreamingReadBufferPrivate *pgsr_buf = (PgStreamingReadBufferPrivate *) pgsr->layer_private; + PgStreamingReadBufferIOPrivate *buf_io_priv = read_private; Relation rel; ForkNumber fork; BlockNumber blocknum; ReadBufferMode mode; bool already_valid; - Buffer buf; - blocknum = pgsr_buf->next_cb(pgsr, pgsr_private, &rel, &fork, &mode); + blocknum = pgsr_buf->next_cb(pgsr, pgsr_private, &buf_io_priv->buffer_io_private, &rel, &fork, &mode); if (blocknum == InvalidBlockNumber) return PGSR_NEXT_END; - buf = ReadBufferAsync(rel, fork, blocknum, - mode, pgsr_buf->strategy, &already_valid, - &aio); - - *read_private = buf; + buf_io_priv->buffer = + ReadBufferAsync(rel, fork, blocknum, mode, pgsr_buf->strategy, + &already_valid, &aio); if (already_valid) return PGSR_NEXT_NO_IO; @@ -423,7 +464,8 @@ pg_streaming_read_buffer_free(PgStreamingRead *pgsr) } PgStreamingRead * -pg_streaming_read_buffer_alloc(uint32 iodepth, uintptr_t pgsr_private, +pg_streaming_read_buffer_alloc(uint32 iodepth, + uint32 per_io_private_size, uintptr_t pgsr_private, BufferAccessStrategy strategy, PgStreamingReadBufferDetermineNextCB determine_next_cb) { @@ -434,7 +476,15 @@ pg_streaming_read_buffer_alloc(uint32 iodepth, uintptr_t pgsr_private, pgsr_buf->next_cb = determine_next_cb; pgsr_buf->strategy = strategy; - pgsr = pg_streaming_read_alloc(iodepth, pgsr_private, + /* + * We stash extra state in the per-io state. This is largely invisible to + * the caller, because we pass an offset into that allocation to + * dtermine_next_cb() / return it in pg_streaming_read_buffer_get_next(). + */ + pgsr = pg_streaming_read_alloc(iodepth, + per_io_private_size + + offsetof(PgStreamingReadBufferIOPrivate, buffer_io_private), + pgsr_private, pg_streaming_read_buffer_next, pg_streaming_read_buffer_release); @@ -443,3 +493,19 @@ pg_streaming_read_buffer_alloc(uint32 iodepth, uintptr_t pgsr_private, return pgsr; } + +Buffer +pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **io_private) +{ + PgStreamingReadBufferIOPrivate *buf_io_priv; + + buf_io_priv = pg_streaming_read_get_next(pgsr); + + if (buf_io_priv == NULL) + return InvalidBuffer; + + if (io_private) + *io_private = &buf_io_priv->buffer_io_private; + + return buf_io_priv->buffer; +} diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h index 11b6a9f282b1d..895d17a5ae852 100644 --- a/src/include/storage/streaming_read.h +++ b/src/include/storage/streaming_read.h @@ -31,13 +31,15 @@ typedef enum PgStreamingReadNextStatus typedef PgStreamingReadNextStatus (*PgStreamingReadDetermineNextCB) (PgStreamingRead *pgsr, uintptr_t pgsr_private, - struct PgAioInProgress *aio, uintptr_t *read_private); -typedef void (*PgStreamingReadRelease) (uintptr_t pgsr_private, uintptr_t read_private); -extern PgStreamingRead *pg_streaming_read_alloc(uint32 iodepth, uintptr_t pgsr_private, + struct PgAioInProgress *aio, void *read_private); +typedef void (*PgStreamingReadRelease) (uintptr_t pgsr_private, void *read_private); +extern PgStreamingRead *pg_streaming_read_alloc(uint32 iodepth, + uint32 per_io_private_size, + uintptr_t pgsr_private, PgStreamingReadDetermineNextCB determine_next_cb, PgStreamingReadRelease release_cb); extern void pg_streaming_read_free(PgStreamingRead *pgsr); -extern uintptr_t pg_streaming_read_get_next(PgStreamingRead *pgsr); +extern void *pg_streaming_read_get_next(PgStreamingRead *pgsr); /* * A layer ontop a base PgStreamingRead that makes it easier to work with @@ -49,9 +51,13 @@ struct RelationData; typedef BlockNumber (*PgStreamingReadBufferDetermineNextCB) (PgStreamingRead *pgsr, uintptr_t pgsr_private, + void *io_private, struct RelationData **rel, ForkNumber *forkNum, ReadBufferMode *mode); -extern PgStreamingRead *pg_streaming_read_buffer_alloc(uint32 iodepth, uintptr_t pgsr_private, +extern PgStreamingRead *pg_streaming_read_buffer_alloc(uint32 iodepth, + uint32 per_io_private_size, + uintptr_t pgsr_private, BufferAccessStrategy strategy, PgStreamingReadBufferDetermineNextCB determine_next_cb); +extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **io_private); #endif /* STREAMING_READ_H */