diff options
author | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-14 11:45:14 +0000 |
---|---|---|
committer | Teodor Sigaev <teodor@sigaev.ru> | 2005-06-14 11:45:14 +0000 |
commit | 37c839365c566c9e8d7e96cea79e535281144224 (patch) | |
tree | 33a5094b45c6a6b6549e05e4e2dcf1c63cc69d38 /src/backend/access/gist | |
parent | d6636543c4becc4ba9989af8e5b490e1ee2e7c0e (diff) | |
download | postgresql-37c839365c566c9e8d7e96cea79e535281144224.tar.gz |
WAL for GiST. It work for online backup and so on, but on
recovery after crash (power loss etc) it may say that it can't restore
index and index should be reindexed.
Some refactoring code.
Diffstat (limited to 'src/backend/access/gist')
-rw-r--r-- | src/backend/access/gist/Makefile | 4 | ||||
-rw-r--r-- | src/backend/access/gist/gist.c | 1530 | ||||
-rw-r--r-- | src/backend/access/gist/gistget.c | 4 | ||||
-rw-r--r-- | src/backend/access/gist/gistutil.c | 785 | ||||
-rw-r--r-- | src/backend/access/gist/gistxlog.c | 628 |
5 files changed, 1874 insertions, 1077 deletions
diff --git a/src/backend/access/gist/Makefile b/src/backend/access/gist/Makefile index f2ec10a90a..b22f846a23 100644 --- a/src/backend/access/gist/Makefile +++ b/src/backend/access/gist/Makefile @@ -4,7 +4,7 @@ # Makefile for access/gist # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.12 2003/11/29 19:51:39 pgsql Exp $ +# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/access/gist top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = gist.o gistget.o gistscan.o +OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o all: SUBSYS.o diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 2a55c91f0d..4e3faccdf9 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -23,43 +23,6 @@ #include "miscadmin.h" #include "utils/memutils.h" - -#undef GIST_PAGEADDITEM - -#define ATTSIZE(datum, tupdesc, i, isnull) \ - ( \ - (isnull) ? 0 : \ - att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \ - ) - -/* result's status */ -#define INSERTED 0x01 -#define SPLITED 0x02 - -/* group flags ( in gistSplit ) */ -#define LEFT_ADDED 0x01 -#define RIGHT_ADDED 0x02 -#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) - -/* - * This defines only for shorter code, used in gistgetadjusted - * and gistadjsubkey only - */ -#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ - if (isnullkey) { \ - gistentryinit((evp), rkey, r, NULL, \ - (OffsetNumber) 0, rkeyb, FALSE); \ - } else { \ - gistentryinit((evp), okey, r, NULL, \ - (OffsetNumber) 0, okeyb, FALSE); \ - } \ -} while(0) - -#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ - FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ - FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ -} while(0); - /* Working state for gistbuild and its callback */ typedef struct { @@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *GISTstate); -static int gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, - int *len, - GISTSTATE *giststate); -static OffsetNumber gistwritebuffer(Relation r, - Page page, - IndexTuple *itup, - int len, - OffsetNumber off); -static bool gistnospace(Page page, IndexTuple *itvec, int len); -static IndexTuple *gistreadbuffer(Buffer buffer, int *len); -static IndexTuple *gistjoinvector( - IndexTuple *itvec, int *len, - IndexTuple *additvec, int addlen); -static IndexTuple gistunion(Relation r, IndexTuple *itvec, - int len, GISTSTATE *giststate); - -static IndexTuple gistgetadjusted(Relation r, - IndexTuple oldtup, - IndexTuple addtup, +static void gistfindleaf(GISTInsertState *state, GISTSTATE *giststate); -static int gistfindgroup(GISTSTATE *giststate, - GISTENTRY *valvec, GIST_SPLITVEC *spl); -static void gistadjsubkey(Relation r, - IndexTuple *itup, int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate); -static IndexTuple gistFormTuple(GISTSTATE *giststate, - Relation r, Datum *attdata, int *datumsize, bool *isnull); + + +typedef struct PageLayout { + gistxlogPage block; + OffsetNumber *list; + Buffer buffer; /* to write after all proceed */ + + struct PageLayout *next; +} PageLayout; + + +#define ROTATEDIST(d) do { \ + PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \ + memset(tmp,0,sizeof(PageLayout)); \ + tmp->next = (d); \ + (d)=tmp; \ +} while(0) + + static IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup, int *len, + PageLayout **dist, GISTSTATE *giststate); -static void gistnewroot(Relation r, IndexTuple *itup, int len); -static void GISTInitBuffer(Buffer b, uint32 f); -static OffsetNumber gistchoose(Relation r, Page p, - IndexTuple it, - GISTSTATE *giststate); -static void gistdelete(Relation r, ItemPointer tid); - -#ifdef GIST_PAGEADDITEM -static IndexTuple gist_tuple_replacekey(Relation r, - GISTENTRY entry, IndexTuple t); -#endif -static void gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, - Relation r, Page pg, - OffsetNumber o, int b, bool l, bool isNull); -static void gistDeCompressAtt(GISTSTATE *giststate, Relation r, - IndexTuple tuple, Page p, OffsetNumber o, - GISTENTRY *attdata, bool *isnull); -static void gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, - float *penalty); + #undef GISTDEBUG @@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS) /* initialize the root page */ buffer = ReadBuffer(index, P_NEW); GISTInitBuffer(buffer, F_LEAF); + if ( !index->rd_istemp ) { + XLogRecPtr recptr; + XLogRecData rdata; + Page page; + + rdata.buffer = InvalidBuffer; + rdata.data = (char*)&(index->rd_node); + rdata.len = sizeof(RelFileNode); + rdata.next = NULL; + + page = BufferGetPage(buffer); + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buffer); /* build the index */ @@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS) PG_RETURN_BOOL(true); } -#ifdef GIST_PAGEADDITEM -/* - * Take a compressed entry, and install it on a page. Since we now know - * where the entry will live, we decompress it and recompress it using - * that knowledge (some compression routines may want to fish around - * on the page, for example, or do something special for leaf nodes.) - */ -static OffsetNumber -gistPageAddItem(GISTSTATE *giststate, - Relation r, - Page page, - Item item, - Size size, - OffsetNumber offsetNumber, - ItemIdFlags flags, - GISTENTRY *dentry, - IndexTuple *newtup) -{ - GISTENTRY tmpcentry; - IndexTuple itup = (IndexTuple) item; - OffsetNumber retval; - Datum datum; - bool IsNull; - - /* - * recompress the item given that we now know the exact page and - * offset for insertion - */ - datum = index_getattr(itup, 1, r->rd_att, &IsNull); - gistdentryinit(giststate, 0, dentry, datum, - (Relation) 0, (Page) 0, - (OffsetNumber) InvalidOffsetNumber, - ATTSIZE(datum, r, 1, IsNull), - FALSE, IsNull); - gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page, - offsetNumber, dentry->bytes, FALSE); - *newtup = gist_tuple_replacekey(r, tmpcentry, itup); - retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup), - offsetNumber, flags); - if (retval == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); - return retval; -} -#endif /* * Workhouse routine for doing insertion into a GiST index. Note that @@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate, static void gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) { - IndexTuple *instup; - int ret, - len = 1; + GISTInsertState state; - instup = (IndexTuple *) palloc(sizeof(IndexTuple)); - instup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); - memcpy(instup[0], itup, IndexTupleSize(itup)); + memset(&state, 0, sizeof(GISTInsertState)); - ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate); - if (ret & SPLITED) - gistnewroot(r, instup, len); -} + state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); + state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup)); + memcpy(state.itup[0], itup, IndexTupleSize(itup)); + state.ituplen=1; + state.r = r; + state.key = itup->t_tid; + state.needInsertComplete = true; + state.xlog_mode = false; -static int -gistlayerinsert(Relation r, BlockNumber blkno, - IndexTuple **itup, /* in - out, has compressed entry */ - int *len, /* in - out */ - GISTSTATE *giststate) -{ - Buffer buffer; - Page page; - int ret; - GISTPageOpaque opaque; + state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + memset( state.stack, 0, sizeof(GISTInsertStack)); + state.stack->blkno=GIST_ROOT_BLKNO; - buffer = ReadBuffer(r, blkno); - page = (Page) BufferGetPage(buffer); - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + gistfindleaf(&state, giststate); + gistmakedeal(&state, giststate); +} - if (!(opaque->flags & F_LEAF)) +static bool +gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) { + bool is_splitted = false; + + if (gistnospace(state->stack->page, state->itup, state->ituplen)) { - /* - * This is an internal page, so continue to walk down the - * tree. We find the child node that has the minimum insertion - * penalty and recursively invoke ourselves to modify that - * node. Once the recursive call returns, we may need to - * adjust the parent node for two reasons: the child node - * split, or the key in this node needs to be adjusted for the - * newly inserted key below us. - */ - ItemId iid; - BlockNumber nblkno; - ItemPointerData oldtid; - IndexTuple oldtup; - OffsetNumber child; - - child = gistchoose(r, page, *(*itup), giststate); - iid = PageGetItemId(page, child); - oldtup = (IndexTuple) PageGetItem(page, iid); - nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + /* no space for insertion */ + IndexTuple *itvec, + *newitup; + int tlen,olen; + PageLayout *dist=NULL, *ptr; + + memset(&dist, 0, sizeof(PageLayout)); + is_splitted = true; + itvec = gistextractbuffer(state->stack->buffer, &tlen); + olen=tlen; + itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); + newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate); + + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogPageSplit xlrec; + XLogRecPtr recptr; + XLogRecData *rdata; + int i, npage = 0, cur=1; + + ptr=dist; + while( ptr ) { + npage++; + ptr=ptr->next; + } - /* - * After this call: 1. if child page was splited, then itup - * contains keys for each page 2. if child page wasn't splited, - * then itup contains additional for adjustment of current key - */ - ret = gistlayerinsert(r, nblkno, itup, len, giststate); + rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2)); + + xlrec.node = state->r->rd_node; + xlrec.origblkno = state->stack->blkno; + xlrec.npage = npage; + xlrec.nitup = state->ituplen; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogPageSplit ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + /* new tuples */ + for(i=0;i<state->ituplen;i++) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(state->itup[i]); + rdata[cur].len = IndexTupleSize(state->itup[i]); + rdata[cur-1].next = &(rdata[cur]); + cur++; + } - /* nothing inserted in child */ - if (!(ret & INSERTED)) - { - ReleaseBuffer(buffer); - return 0x00; - } + /* new page layout */ + ptr=dist; + while(ptr) { + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)&(ptr->block); + rdata[cur].len = sizeof(gistxlogPage); + rdata[cur-1].next = &(rdata[cur]); + cur++; + + rdata[cur].buffer = InvalidBuffer; + rdata[cur].data = (char*)(ptr->list); + rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num); + if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num ) + rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len ); + rdata[cur-1].next = &(rdata[cur]); + rdata[cur].next=NULL; + cur++; + + ptr=ptr->next; + } - /* child did not split */ - if (!(ret & SPLITED)) - { - IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate); + START_CRIT_SECTION(); - if (!newtup) - { - /* not need to update key */ - ReleaseBuffer(buffer); - return 0x00; + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata); + ptr = dist; + while(ptr) { + PageSetLSN(BufferGetPage(ptr->buffer), recptr); + PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID); + ptr=ptr->next; } - (*itup)[0] = newtup; + END_CRIT_SECTION(); } - /* - * This node's key has been modified, either because a child - * split occurred or because we needed to adjust our key for - * an insert in a child node. Therefore, remove the old - * version of this node's key. - */ - ItemPointerSet(&oldtid, blkno, child); - gistdelete(r, &oldtid); - - /* - * if child was splitted, new key for child will be inserted in - * the end list of child, so we must say to any scans that page is - * changed beginning from 'child' offset - */ - if (ret & SPLITED) - gistadjscans(r, GISTOP_SPLIT, blkno, child); - } + ptr = dist; + while(ptr) { + WriteBuffer(ptr->buffer); + ptr=ptr->next; + } - ret = INSERTED; + state->itup = newitup; + state->ituplen = tlen; /* now tlen >= 2 */ - if (gistnospace(page, (*itup), *len)) - { - /* no space for insertion */ - IndexTuple *itvec, - *newitup; - int tlen, - oldlen; - - ret |= SPLITED; - itvec = gistreadbuffer(buffer, &tlen); - itvec = gistjoinvector(itvec, &tlen, (*itup), *len); - oldlen = *len; - newitup = gistSplit(r, buffer, itvec, &tlen, giststate); - ReleaseBuffer(buffer); - *itup = newitup; - *len = tlen; /* now tlen >= 2 */ + if ( state->stack->blkno == GIST_ROOT_BLKNO ) { + gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode); + state->needInsertComplete=false; + } + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); } else { /* enough space */ - OffsetNumber off, - l; + OffsetNumber off, l; - off = (PageIsEmpty(page)) ? + off = (PageIsEmpty(state->stack->page)) ? FirstOffsetNumber : - OffsetNumberNext(PageGetMaxOffsetNumber(page)); - l = gistwritebuffer(r, page, *itup, *len, off); - WriteBuffer(buffer); + OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page)); + l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off); + if ( !state->r->rd_istemp && !state->xlog_mode) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) ); + int i, cur=0; + + xlrec.node = state->r->rd_node; + xlrec.blkno = state->stack->blkno; + xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber; + xlrec.key = state->key; + xlrec.pathlen = (uint16)state->pathlen; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + if ( state->pathlen>=0 ) { + rdata[0].next = &(rdata[1]); + rdata[1].buffer = InvalidBuffer; + rdata[1].data = (char *) (state->path); + rdata[1].len = sizeof( BlockNumber ) * state->pathlen; + rdata[1].next = NULL; + cur++; + } + + for(i=1; i<=state->ituplen; i++) { /* adding tuples */ + rdata[i+cur].buffer = InvalidBuffer; + rdata[i+cur].data = (char*)(state->itup[i-1]); + rdata[i+cur].len = IndexTupleSize(state->itup[i-1]); + rdata[i+cur].next = NULL; + rdata[i-1+cur].next = &(rdata[i+cur]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata); + PageSetLSN(state->stack->page, recptr); + PageSetTLI(state->stack->page, ThisTimeLineID); + + END_CRIT_SECTION(); + } + + if ( state->stack->blkno == GIST_ROOT_BLKNO ) + state->needInsertComplete=false; - if (*len > 1) - { /* previous insert ret & SPLITED != 0 */ + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(state->stack->buffer); + + if (state->ituplen > 1) + { /* previous is_splitted==true */ /* * child was splited, so we must form union for insertion in * parent */ - IndexTuple newtup = gistunion(r, (*itup), *len, giststate); - ItemPointerSet(&(newtup->t_tid), blkno, 1); - (*itup)[0] = newtup; - *len = 1; + IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); + ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber); + state->itup[0] = newtup; + state->ituplen = 1; } } - - return ret; -} - -/* - * Write itup vector to page, has no control of free space - */ -static OffsetNumber -gistwritebuffer(Relation r, Page page, IndexTuple *itup, - int len, OffsetNumber off) -{ - OffsetNumber l = InvalidOffsetNumber; - int i; - - for (i = 0; i < len; i++) - { -#ifdef GIST_PAGEADDITEM - GISTENTRY tmpdentry; - IndexTuple newtup; - bool IsNull; - - l = gistPageAddItem(giststate, r, page, - (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED, &tmpdentry, &newtup); - off = OffsetNumberNext(off); -#else - l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), - off, LP_USED); - if (l == InvalidOffsetNumber) - elog(ERROR, "failed to add index item to \"%s\"", - RelationGetRelationName(r)); -#endif - } - return l; + return is_splitted; } -/* - * Check space for itup vector on page - */ -static bool -gistnospace(Page page, IndexTuple *itvec, int len) -{ - unsigned int size = 0; - int i; - - for (i = 0; i < len; i++) - size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); - - return (PageGetFreeSpace(page) < size); -} - -/* - * Read buffer into itup vector - */ -static IndexTuple * -gistreadbuffer(Buffer buffer, int *len /* out */ ) -{ - OffsetNumber i, - maxoff; - IndexTuple *itvec; - Page p = (Page) BufferGetPage(buffer); - - maxoff = PageGetMaxOffsetNumber(p); - *len = maxoff; - itvec = palloc(sizeof(IndexTuple) * maxoff); - for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) - itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - - return itvec; -} - -/* - * join two vectors into one - */ -static IndexTuple * -gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) -{ - itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); - memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); - *len += addlen; - return itvec; -} - -/* - * Return an IndexTuple containing the result of applying the "union" - * method to the specified IndexTuple vector. - */ -static IndexTuple -gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +static void +gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) { - Datum attr[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - GistEntryVector *evec; - int i; - GISTENTRY centry[INDEX_MAX_KEYS]; - - evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum; - int j; - int real_len; + ItemId iid; + IndexTuple oldtup; + GISTInsertStack *ptr; - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); - if (IsNull) - continue; - - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - } + /* walk down */ + while( true ) { + GISTPageOpaque opaque; - /* If this tuple vector was all NULLs, the union is NULL */ - if (real_len == 0) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else + state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); + state->stack->page = (Page) BufferGetPage(state->stack->buffer); + opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page); + + if (!(opaque->flags & F_LEAF)) { - int datumsize; - - if (real_len == 1) - { - evec->n = 2; - gistentryinit(evec->vector[1], - evec->vector[0].key, r, NULL, - (OffsetNumber) 0, evec->vector[0].bytes, FALSE); - } - else - evec->n = real_len; - - /* Compress the result of the union and store in attr array */ - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - isnull[i] = FALSE; - attr[i] = centry[i].key; - } + /* + * This is an internal page, so continue to walk down the + * tree. We find the child node that has the minimum insertion + * penalty and recursively invoke ourselves to modify that + * node. Once the recursive call returns, we may need to + * adjust the parent node for two reasons: the child node + * split, or the key in this node needs to be adjusted for the + * newly inserted key below us. + */ + GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack)); + + state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); + + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid)); + item->parent = state->stack; + item->todelete = false; + state->stack = item; + } else + break; } - return index_form_tuple(giststate->tupdesc, attr, isnull); -} - - -/* - * Forms union of oldtup and addtup, if union == oldtup then return NULL - */ -static IndexTuple -gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) -{ - GistEntryVector *evec; - bool neednew = false; - bool isnull[INDEX_MAX_KEYS]; - Datum attr[INDEX_MAX_KEYS]; - GISTENTRY centry[INDEX_MAX_KEYS], - oldatt[INDEX_MAX_KEYS], - addatt[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - bool oldisnull[INDEX_MAX_KEYS], - addisnull[INDEX_MAX_KEYS]; - IndexTuple newtup = NULL; - int i; - - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); - - gistDeCompressAtt(giststate, r, oldtup, NULL, - (OffsetNumber) 0, oldatt, oldisnull); - - gistDeCompressAtt(giststate, r, addtup, NULL, - (OffsetNumber) 0, addatt, addisnull); - - for (i = 0; i < r->rd_att->natts; i++) - { - if (oldisnull[i] && addisnull[i]) - { - attr[i] = (Datum) 0; - isnull[i] = TRUE; - } - else - { - Datum datum; - int datumsize; - - FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, - addisnull[i], addatt[i].key, addatt[i].bytes); - - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - if (oldisnull[i] || addisnull[i]) - { - if (oldisnull[i]) - neednew = true; - } - else - { - bool result; - - FunctionCall3(&giststate->equalFn[i], - ev0p->key, - datum, - PointerGetDatum(&result)); + /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */ - if (!result) - neednew = true; - } - - gistcentryinit(giststate, i, ¢ry[i], datum, - NULL, NULL, (OffsetNumber) 0, - datumsize, FALSE, FALSE); - - attr[i] = centry[i].key; - isnull[i] = FALSE; - } + /* form state->path to work xlog */ + ptr = state->stack; + state->pathlen=1; + while( ptr ) { + state->pathlen++; + ptr=ptr->parent; } - - if (neednew) - { - /* need to update key */ - newtup = index_form_tuple(giststate->tupdesc, attr, isnull); - newtup->t_tid = oldtup->t_tid; + state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen); + ptr = state->stack; + state->pathlen=0; + while( ptr ) { + state->path[ state->pathlen ] = ptr->blkno; + state->pathlen++; + ptr=ptr->parent; } - - return newtup; + state->pathlen--; + state->path++; } -static void -gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) -{ - int lr; - - for (lr = 0; lr < 2; lr++) - { - OffsetNumber *entries; - int i; - Datum *attr; - int len, - *attrsize; - bool *isnull; - GistEntryVector *evec; - - if (lr) - { - attrsize = spl->spl_lattrsize; - attr = spl->spl_lattr; - len = spl->spl_nleft; - entries = spl->spl_left; - isnull = spl->spl_lisnull; - } - else - { - attrsize = spl->spl_rattrsize; - attr = spl->spl_rattr; - len = spl->spl_nright; - entries = spl->spl_right; - isnull = spl->spl_risnull; - } - - evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); - - for (i = 1; i < r->rd_att->natts; i++) - { - int j; - Datum datum; - int datumsize; - int real_len; - - real_len = 0; - for (j = 0; j < len; j++) - { - bool IsNull; - - if (spl->spl_idgrp[entries[j]]) - continue; - datum = index_getattr(itvec[entries[j] - 1], i + 1, - giststate->tupdesc, &IsNull); - if (IsNull) - continue; - gistdentryinit(giststate, i, - &(evec->vector[real_len]), - datum, - NULL, NULL, (OffsetNumber) 0, - ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), - FALSE, IsNull); - real_len++; - - } - if (real_len == 0) - { - datum = (Datum) 0; - datumsize = 0; - isnull[i] = true; - } - else - { - /* - * evec->vector[0].bytes may be not defined, so form union - * with itself - */ - if (real_len == 1) - { - evec->n = 2; - memcpy(&(evec->vector[1]), &(evec->vector[0]), - sizeof(GISTENTRY)); - } - else - evec->n = real_len; - datum = FunctionCall2(&giststate->unionFn[i], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - isnull[i] = false; - } +void +gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) { + int is_splitted; + ItemId iid; + IndexTuple oldtup, newtup; - attr[i] = datum; - attrsize[i] = datumsize; - } - } -} + /* walk up */ + while( true ) { + /* + * After this call: 1. if child page was splited, then itup + * contains keys for each page 2. if child page wasn't splited, + * then itup contains additional for adjustment of current key + */ -/* - * find group in vector with equal value - */ -static int -gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) -{ - int i; - int curid = 1; - - /* - * first key is always not null (see gistinsert), so we may not check - * for nulls - */ - for (i = 0; i < spl->spl_nleft; i++) - { - int j; - int len; - bool result; - - if (spl->spl_idgrp[spl->spl_left[i]]) - continue; - len = 0; - /* find all equal value in right part */ - for (j = 0; j < spl->spl_nright; j++) - { - if (spl->spl_idgrp[spl->spl_right[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_right[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_right[j]] = curid; - len++; - } - } - /* find all other equal value in left part */ - if (len) - { - /* add current val to list of equal values */ - spl->spl_idgrp[spl->spl_left[i]] = curid; - /* searching .. */ - for (j = i + 1; j < spl->spl_nleft; j++) - { - if (spl->spl_idgrp[spl->spl_left[j]]) - continue; - FunctionCall3(&giststate->equalFn[0], - valvec[spl->spl_left[i]].key, - valvec[spl->spl_left[j]].key, - PointerGetDatum(&result)); - if (result) - { - spl->spl_idgrp[spl->spl_left[j]] = curid; - len++; - } - } - spl->spl_ngrp[curid] = len + 1; - curid++; - } - } + is_splitted = gistplacetopage(state, giststate ); - return curid; -} + /* pop page from stack */ + state->stack = state->stack->parent; + state->pathlen--; + state->path++; + + /* stack is void */ + if ( ! state->stack ) + break; -/* - * Insert equivalent tuples to left or right page with minimum - * penalty - */ -static void -gistadjsubkey(Relation r, - IndexTuple *itup, /* contains compressed entry */ - int *len, - GIST_SPLITVEC *v, - GISTSTATE *giststate) -{ - int curlen; - OffsetNumber *curwpos; - GISTENTRY entry, - identry[INDEX_MAX_KEYS], - *ev0p, - *ev1p; - float lpenalty, - rpenalty; - GistEntryVector *evec; - int datumsize; - bool isnull[INDEX_MAX_KEYS]; - int i, - j; - /* clear vectors */ - curlen = v->spl_nleft; - curwpos = v->spl_left; - for (i = 0; i < v->spl_nleft; i++) - { - if (v->spl_idgrp[v->spl_left[i]] == 0) + /* child did not split */ + if (!is_splitted) { - *curwpos = v->spl_left[i]; - curwpos++; - } - else - curlen--; - } - v->spl_nleft = curlen; + /* parent's tuple */ + iid = PageGetItemId(state->stack->page, state->stack->childoffnum); + oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); + newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); + + if (!newtup) /* not need to update key */ + break; - curlen = v->spl_nright; - curwpos = v->spl_right; - for (i = 0; i < v->spl_nright; i++) - { - if (v->spl_idgrp[v->spl_right[i]] == 0) - { - *curwpos = v->spl_right[i]; - curwpos++; + state->itup[0] = newtup; } - else - curlen--; + + /* + * This node's key has been modified, either because a child + * split occurred or because we needed to adjust our key for + * an insert in a child node. Therefore, remove the old + * version of this node's key. + */ + gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum); + PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); + if ( !state->r->rd_istemp ) + state->stack->todelete = true; + + /* + * if child was splitted, new key for child will be inserted in + * the end list of child, so we must say to any scans that page is + * changed beginning from 'child' offset + */ + if (is_splitted) + gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum); + } /* while */ + + /* release all buffers */ + while( state->stack ) { + if ( state->xlog_mode ) + LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(state->stack->buffer); + state->stack = state->stack->parent; } - v->spl_nright = curlen; - evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); - evec->n = 2; - ev0p = &(evec->vector[0]); - ev1p = &(evec->vector[1]); + /* say to xlog that insert is completed */ + if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) { + gistxlogInsertComplete xlrec; + XLogRecData rdata; + + xlrec.node = state->r->rd_node; + xlrec.key = state->key; + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogInsertComplete ); + rdata.next = NULL; - /* add equivalent tuple */ - for (i = 0; i < *len; i++) - { - Datum datum; + START_CRIT_SECTION(); - if (v->spl_idgrp[i + 1] == 0) /* already inserted */ - continue; - gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, - identry, isnull); + XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata); - v->spl_ngrp[v->spl_idgrp[i + 1]]--; - if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && - (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) - { - /* force last in group */ - rpenalty = 1.0; - lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; - } - else - { - /* where? */ - for (j = 1; j < r->rd_att->natts; j++) - { - gistentryinit(entry, v->spl_lattr[j], r, NULL, - (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_lisnull[j], - &identry[j], isnull[j], &lpenalty); - - gistentryinit(entry, v->spl_rattr[j], r, NULL, - (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); - gistpenalty(giststate, j, &entry, v->spl_risnull[j], - &identry[j], isnull[j], &rpenalty); - - if (lpenalty != rpenalty) - break; - } - } - - /* - * add - * XXX: refactor this to avoid duplicating code - */ - if (lpenalty < rpenalty) - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; - v->spl_left[v->spl_nleft] = i + 1; - v->spl_nleft++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_lisnull[j]) - { - v->spl_lattr[j] = (Datum) 0; - v->spl_lattrsize[j] = 0; - } - else - { - FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_lattr[j] = datum; - v->spl_lattrsize[j] = datumsize; - v->spl_lisnull[j] = false; - } - } - } - else - { - v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; - v->spl_right[v->spl_nright] = i + 1; - v->spl_nright++; - for (j = 1; j < r->rd_att->natts; j++) - { - if (isnull[j] && v->spl_risnull[j]) - { - v->spl_rattr[j] = (Datum) 0; - v->spl_rattrsize[j] = 0; - } - else - { - FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], - isnull[j], identry[j].key, identry[j].bytes); - - datum = FunctionCall2(&giststate->unionFn[j], - PointerGetDatum(evec), - PointerGetDatum(&datumsize)); - - v->spl_rattr[j] = datum; - v->spl_rattrsize[j] = datumsize; - v->spl_risnull[j] = false; - } - } - } + END_CRIT_SECTION(); } } @@ -1105,6 +674,7 @@ gistSplit(Relation r, Buffer buffer, IndexTuple *itup, /* contains compressed entry */ int *len, + PageLayout **dist, GISTSTATE *giststate) { Page p; @@ -1225,29 +795,57 @@ gistSplit(Relation r, /* write on disk (may need another split) */ if (gistnospace(right, rvectup, v.spl_nright)) { + int i; + PageLayout *d, *origd=*dist; + nlen = v.spl_nright; - newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate); + newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate); + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_right[ d->list[i]-1 ]; + d=d->next; + } ReleaseBuffer(rightbuf); } else { OffsetNumber l; - l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); - WriteBuffer(rightbuf); - + l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(rightbuf); + (*dist)->block.num = v.spl_nright; + (*dist)->list = v.spl_right; + (*dist)->buffer = rightbuf; + nlen = 1; newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); - ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); + ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber); } if (gistnospace(left, lvectup, v.spl_nleft)) { int llen = v.spl_nleft; IndexTuple *lntup; - - lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate); + int i; + PageLayout *d, *origd=*dist; + + lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate); + + /* XLOG stuff */ + d=*dist; + /* translate offsetnumbers to our */ + while( d && d!=origd ) { + for(i=0;i<d->block.num;i++) + d->list[i] = v.spl_left[ d->list[i]-1 ]; + d=d->next; + } + ReleaseBuffer(leftbuf); newtup = gistjoinvector(newtup, &nlen, lntup, llen); @@ -1256,151 +854,76 @@ gistSplit(Relation r, { OffsetNumber l; - l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); + l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) PageRestoreTempPage(left, p); - WriteBuffer(leftbuf); - + /* XLOG stuff */ + ROTATEDIST(*dist); + (*dist)->block.blkno = BufferGetBlockNumber(leftbuf); + (*dist)->block.num = v.spl_nleft; + (*dist)->list = v.spl_left; + (*dist)->buffer = leftbuf; + nlen += 1; newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); - ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); + ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber); } *len = nlen; return newtup; } -static void -gistnewroot(Relation r, IndexTuple *itup, int len) -{ - Buffer b; - Page p; - - b = ReadBuffer(r, GIST_ROOT_BLKNO); - GISTInitBuffer(b, 0); - p = BufferGetPage(b); - - gistwritebuffer(r, p, itup, len, FirstOffsetNumber); - WriteBuffer(b); -} - -static void -GISTInitBuffer(Buffer b, uint32 f) +void +gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode) { - GISTPageOpaque opaque; + Buffer buffer; Page page; - Size pageSize; - - pageSize = BufferGetPageSize(b); - page = BufferGetPage(b); - PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); - - opaque = (GISTPageOpaque) PageGetSpecialPointer(page); - opaque->flags = f; -} - -/* - * find entry with lowest penalty - */ -static OffsetNumber -gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ - GISTSTATE *giststate) -{ - OffsetNumber maxoff; - OffsetNumber i; - OffsetNumber which; - float sum_grow, - which_grow[INDEX_MAX_KEYS]; - GISTENTRY entry, - identry[INDEX_MAX_KEYS]; - bool isnull[INDEX_MAX_KEYS]; - - maxoff = PageGetMaxOffsetNumber(p); - *which_grow = -1.0; - which = -1; - sum_grow = 1; - gistDeCompressAtt(giststate, r, - it, NULL, (OffsetNumber) 0, - identry, isnull); - - for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) - { - int j; - IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); - sum_grow = 0; - for (j = 0; j < r->rd_att->natts; j++) - { - Datum datum; - float usize; - bool IsNull; - - datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); - gistdentryinit(giststate, j, &entry, datum, r, p, i, - ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), - FALSE, IsNull); - gistpenalty(giststate, j, &entry, IsNull, - &identry[j], isnull[j], &usize); - - if (which_grow[j] < 0 || usize < which_grow[j]) - { - which = i; - which_grow[j] = usize; - if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) - which_grow[j + 1] = -1; - sum_grow += which_grow[j]; - } - else if (which_grow[j] == usize) - sum_grow += usize; - else - { - sum_grow = 1; - break; - } - } + buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO); + GISTInitBuffer(buffer, 0); + page = BufferGetPage(buffer); + + gistfillbuffer(r, page, itup, len, FirstOffsetNumber); + if ( !xlog_mode && !r->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) ); + int i; + + xlrec.node = r->rd_node; + xlrec.blkno = GIST_ROOT_BLKNO; + xlrec.todeleteoffnum = InvalidOffsetNumber; + xlrec.key = *key; + xlrec.pathlen=0; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof( gistxlogEntryUpdate ); + rdata[0].next = NULL; + + for(i=1; i<=len; i++) { + rdata[i].buffer = InvalidBuffer; + rdata[i].data = (char*)(itup[i-1]); + rdata[i].len = IndexTupleSize(itup[i-1]); + rdata[i].next = NULL; + rdata[i-1].next = &(rdata[i]); + } + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); } - - return which; + if ( xlog_mode ) + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); } -/* - * Retail deletion of a single tuple. - * - * NB: this is no longer called externally, but is still needed by - * gistlayerinsert(). That dependency will have to be fixed if GIST - * is ever going to allow concurrent insertions. - */ -static void -gistdelete(Relation r, ItemPointer tid) -{ - BlockNumber blkno; - OffsetNumber offnum; - Buffer buf; - Page page; - - /* - * Since GIST is not marked "amconcurrent" in pg_am, caller should - * have acquired exclusive lock on index relation. We need no locking - * here. - */ - - blkno = ItemPointerGetBlockNumber(tid); - offnum = ItemPointerGetOffsetNumber(tid); - - /* adjust any scans that will be affected by this deletion */ - /* NB: this works only for scans in *this* backend! */ - gistadjscans(r, GISTOP_DEL, blkno, offnum); - - /* delete the index tuple */ - buf = ReadBuffer(r, blkno); - page = BufferGetPage(buf); - - PageIndexTupleDelete(page, offnum); - - WriteBuffer(buf); -} /* * Bulk deletion of all index entries pointing to a set of heap tuples. @@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS) page = BufferGetPage(buf); PageIndexTupleDelete(page, offnum); + if ( !rel->rd_istemp ) { + gistxlogEntryUpdate xlrec; + XLogRecPtr recptr; + XLogRecData rdata; + + xlrec.node = rel->rd_node; + xlrec.blkno = blkno; + xlrec.todeleteoffnum = offnum; + xlrec.pathlen=0; + ItemPointerSetInvalid( &(xlrec.key) ); + + rdata.buffer = InvalidBuffer; + rdata.data = (char *) &xlrec; + rdata.len = sizeof( gistxlogEntryUpdate ); + rdata.next = NULL; + + START_CRIT_SECTION(); + + recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + + END_CRIT_SECTION(); + } WriteBuffer(buf); @@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate) /* no work */ } -#ifdef GIST_PAGEADDITEM -/* - * Given an IndexTuple to be inserted on a page, this routine replaces - * the key with another key, which may involve generating a new IndexTuple - * if the sizes don't match or if the null status changes. - * - * XXX this only works for a single-column index tuple! - */ -static IndexTuple -gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t) -{ - bool IsNull; - Datum datum = index_getattr(t, 1, r->rd_att, &IsNull); - - /* - * If new entry fits in index tuple, copy it in. To avoid worrying - * about null-value bitmask, pass it off to the general - * index_form_tuple routine if either the previous or new value is - * NULL. - */ - if (!IsNull && DatumGetPointer(entry.key) != NULL && - (Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull)) - { - memcpy(DatumGetPointer(datum), - DatumGetPointer(entry.key), - entry.bytes); - /* clear out old size */ - t->t_info &= ~INDEX_SIZE_MASK; - /* or in new size */ - t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData)); - - return t; - } - else - { - /* generate a new index tuple for the compressed entry */ - TupleDesc tupDesc = r->rd_att; - IndexTuple newtup; - bool isnull; - - isnull = (DatumGetPointer(entry.key) == NULL); - newtup = index_form_tuple(tupDesc, &(entry.key), &isnull); - newtup->t_tid = t->t_tid; - return newtup; - } -} -#endif - -/* - * initialize a GiST entry with a decompressed version of key - */ -void -gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, - Datum k, Relation r, Page pg, OffsetNumber o, - int b, bool l, bool isNull) -{ - if (b && !isNull) - { - GISTENTRY *dep; - - gistentryinit(*e, k, r, pg, o, b, l); - dep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], - PointerGetDatum(e))); - /* decompressFn may just return the given pointer */ - if (dep != e) - gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, - dep->bytes, dep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - - -/* - * initialize a GiST entry with a compressed version of key - */ -static void -gistcentryinit(GISTSTATE *giststate, int nkey, - GISTENTRY *e, Datum k, Relation r, - Page pg, OffsetNumber o, int b, bool l, bool isNull) -{ - if (!isNull) - { - GISTENTRY *cep; - - gistentryinit(*e, k, r, pg, o, b, l); - cep = (GISTENTRY *) - DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], - PointerGetDatum(e))); - /* compressFn may just return the given pointer */ - if (cep != e) - gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, - cep->bytes, cep->leafkey); - } - else - gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); -} - -static IndexTuple -gistFormTuple(GISTSTATE *giststate, Relation r, - Datum attdata[], int datumsize[], bool isnull[]) -{ - GISTENTRY centry[INDEX_MAX_KEYS]; - Datum compatt[INDEX_MAX_KEYS]; - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - if (isnull[i]) - compatt[i] = (Datum) 0; - else - { - gistcentryinit(giststate, i, ¢ry[i], attdata[i], - NULL, NULL, (OffsetNumber) 0, - datumsize[i], FALSE, FALSE); - compatt[i] = centry[i].key; - } - } - - return index_form_tuple(giststate->tupdesc, compatt, isnull); -} - -static void -gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, - OffsetNumber o, GISTENTRY *attdata, bool *isnull) -{ - int i; - - for (i = 0; i < r->rd_att->natts; i++) - { - Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); - gistdentryinit(giststate, i, &attdata[i], - datum, r, p, o, - ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), - FALSE, isnull[i]); - } -} - -static void -gistpenalty(GISTSTATE *giststate, int attno, - GISTENTRY *key1, bool isNull1, - GISTENTRY *key2, bool isNull2, float *penalty) -{ - if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) - *penalty = 0.0; - else - FunctionCall3(&giststate->penaltyFn[attno], - PointerGetDatum(key1), - PointerGetDatum(key2), - PointerGetDatum(penalty)); -} - #ifdef GISTDEBUG static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) @@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) } #endif /* defined GISTDEBUG */ -void -gist_redo(XLogRecPtr lsn, XLogRecord *record) -{ - elog(PANIC, "gist_redo: unimplemented"); -} - -void -gist_desc(char *buf, uint8 xl_info, char *rec) -{ -} diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index e5f77b6792..5b9a94471b 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,14 +8,14 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.47 2005/05/17 03:34:18 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" -#include "access/gist_private.h" #include "access/itup.h" +#include "access/gist_private.h" #include "executor/execdebug.h" #include "utils/memutils.h" diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c new file mode 100644 index 0000000000..44391f9f73 --- /dev/null +++ b/src/backend/access/gist/gistutil.c @@ -0,0 +1,785 @@ +/*------------------------------------------------------------------------- + * + * gistutil.c + * utilities routines for the postgres GiST index access method. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/gist_private.h" +#include "access/gistscan.h" +#include "access/heapam.h" +#include "catalog/index.h" +#include "miscadmin.h" + +/* group flags ( in gistadjsubkey ) */ +#define LEFT_ADDED 0x01 +#define RIGHT_ADDED 0x02 +#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED ) + + +/* + * This defines is only for shorter code, used in gistgetadjusted + * and gistadjsubkey only + */ +#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \ + if (isnullkey) { \ + gistentryinit((evp), rkey, r, NULL, \ + (OffsetNumber) 0, rkeyb, FALSE); \ + } else { \ + gistentryinit((evp), okey, r, NULL, \ + (OffsetNumber) 0, okeyb, FALSE); \ + } \ +} while(0) + +#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \ + FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \ + FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \ +} while(0); + + +static void +gistpenalty(GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, float *penalty); + +/* + * Write itup vector to page, has no control of free space + */ +OffsetNumber +gistfillbuffer(Relation r, Page page, IndexTuple *itup, + int len, OffsetNumber off) +{ + OffsetNumber l = InvalidOffsetNumber; + int i; + + for (i = 0; i < len; i++) + { + l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), + off, LP_USED); + if (l == InvalidOffsetNumber) + elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"", + RelationGetRelationName(r)); + off++; + } + return l; +} + +/* + * Check space for itup vector on page + */ +bool +gistnospace(Page page, IndexTuple *itvec, int len) +{ + unsigned int size = 0; + int i; + + for (i = 0; i < len; i++) + size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); + + return (PageGetFreeSpace(page) < size); +} + +/* + * Read buffer into itup vector + */ +IndexTuple * +gistextractbuffer(Buffer buffer, int *len /* out */ ) +{ + OffsetNumber i, + maxoff; + IndexTuple *itvec; + Page p = (Page) BufferGetPage(buffer); + + maxoff = PageGetMaxOffsetNumber(p); + *len = maxoff; + itvec = palloc(sizeof(IndexTuple) * maxoff); + for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) + itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + + return itvec; +} + +/* + * join two vectors into one + */ +IndexTuple * +gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) +{ + itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); + memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); + *len += addlen; + return itvec; +} + +/* + * Return an IndexTuple containing the result of applying the "union" + * method to the specified IndexTuple vector. + */ +IndexTuple +gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) +{ + Datum attr[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + GistEntryVector *evec; + int i; + GISTENTRY centry[INDEX_MAX_KEYS]; + + evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum; + int j; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) + { + bool IsNull; + datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); + if (IsNull) + continue; + + gistdentryinit(giststate, i, + &(evec->vector[real_len]), + datum, + NULL, NULL, (OffsetNumber) 0, + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; + } + + /* If this tuple vector was all NULLs, the union is NULL */ + if (real_len == 0) + { + attr[i] = (Datum) 0; + isnull[i] = TRUE; + } + else + { + int datumsize; + + if (real_len == 1) + { + evec->n = 2; + gistentryinit(evec->vector[1], + evec->vector[0].key, r, NULL, + (OffsetNumber) 0, evec->vector[0].bytes, FALSE); + } + else + evec->n = real_len; + + /* Compress the result of the union and store in attr array */ + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + gistcentryinit(giststate, i, ¢ry[i], datum, + NULL, NULL, (OffsetNumber) 0, + datumsize, FALSE, FALSE); + isnull[i] = FALSE; + attr[i] = centry[i].key; + } + } + + return index_form_tuple(giststate->tupdesc, attr, isnull); +} + + +/* + * Forms union of oldtup and addtup, if union == oldtup then return NULL + */ +IndexTuple +gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) +{ + GistEntryVector *evec; + bool neednew = false; + bool isnull[INDEX_MAX_KEYS]; + Datum attr[INDEX_MAX_KEYS]; + GISTENTRY centry[INDEX_MAX_KEYS], + oldatt[INDEX_MAX_KEYS], + addatt[INDEX_MAX_KEYS], + *ev0p, + *ev1p; + bool oldisnull[INDEX_MAX_KEYS], + addisnull[INDEX_MAX_KEYS]; + IndexTuple newtup = NULL; + int i; + + evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); + evec->n = 2; + ev0p = &(evec->vector[0]); + ev1p = &(evec->vector[1]); + + gistDeCompressAtt(giststate, r, oldtup, NULL, + (OffsetNumber) 0, oldatt, oldisnull); + + gistDeCompressAtt(giststate, r, addtup, NULL, + (OffsetNumber) 0, addatt, addisnull); + + for (i = 0; i < r->rd_att->natts; i++) + { + if (oldisnull[i] && addisnull[i]) + { + attr[i] = (Datum) 0; + isnull[i] = TRUE; + } + else + { + Datum datum; + int datumsize; + + FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes, + addisnull[i], addatt[i].key, addatt[i].bytes); + + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + if (oldisnull[i] || addisnull[i]) + { + if (oldisnull[i]) + neednew = true; + } + else + { + bool result; + + FunctionCall3(&giststate->equalFn[i], + ev0p->key, + datum, + PointerGetDatum(&result)); + + if (!result) + neednew = true; + } + + gistcentryinit(giststate, i, ¢ry[i], datum, + NULL, NULL, (OffsetNumber) 0, + datumsize, FALSE, FALSE); + + attr[i] = centry[i].key; + isnull[i] = FALSE; + } + } + + if (neednew) + { + /* need to update key */ + newtup = index_form_tuple(giststate->tupdesc, attr, isnull); + newtup->t_tid = oldtup->t_tid; + } + + return newtup; +} + +void +gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl) +{ + int lr; + + for (lr = 0; lr < 2; lr++) + { + OffsetNumber *entries; + int i; + Datum *attr; + int len, + *attrsize; + bool *isnull; + GistEntryVector *evec; + + if (lr) + { + attrsize = spl->spl_lattrsize; + attr = spl->spl_lattr; + len = spl->spl_nleft; + entries = spl->spl_left; + isnull = spl->spl_lisnull; + } + else + { + attrsize = spl->spl_rattrsize; + attr = spl->spl_rattr; + len = spl->spl_nright; + entries = spl->spl_right; + isnull = spl->spl_risnull; + } + + evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ); + + for (i = 1; i < r->rd_att->natts; i++) + { + int j; + Datum datum; + int datumsize; + int real_len; + + real_len = 0; + for (j = 0; j < len; j++) + { + bool IsNull; + + if (spl->spl_idgrp[entries[j]]) + continue; + datum = index_getattr(itvec[entries[j] - 1], i + 1, + giststate->tupdesc, &IsNull); + if (IsNull) + continue; + gistdentryinit(giststate, i, + &(evec->vector[real_len]), + datum, + NULL, NULL, (OffsetNumber) 0, + ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull), + FALSE, IsNull); + real_len++; + + } + + if (real_len == 0) + { + datum = (Datum) 0; + datumsize = 0; + isnull[i] = true; + } + else + { + /* + * evec->vector[0].bytes may be not defined, so form union + * with itself + */ + if (real_len == 1) + { + evec->n = 2; + memcpy(&(evec->vector[1]), &(evec->vector[0]), + sizeof(GISTENTRY)); + } + else + evec->n = real_len; + datum = FunctionCall2(&giststate->unionFn[i], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + isnull[i] = false; + } + + attr[i] = datum; + attrsize[i] = datumsize; + } + } +} + +/* + * find group in vector with equal value + */ +int +gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl) +{ + int i; + int curid = 1; + + /* + * first key is always not null (see gistinsert), so we may not check + * for nulls + */ + for (i = 0; i < spl->spl_nleft; i++) + { + int j; + int len; + bool result; + + if (spl->spl_idgrp[spl->spl_left[i]]) + continue; + len = 0; + /* find all equal value in right part */ + for (j = 0; j < spl->spl_nright; j++) + { + if (spl->spl_idgrp[spl->spl_right[j]]) + continue; + FunctionCall3(&giststate->equalFn[0], + valvec[spl->spl_left[i]].key, + valvec[spl->spl_right[j]].key, + PointerGetDatum(&result)); + if (result) + { + spl->spl_idgrp[spl->spl_right[j]] = curid; + len++; + } + } + /* find all other equal value in left part */ + if (len) + { + /* add current val to list of equal values */ + spl->spl_idgrp[spl->spl_left[i]] = curid; + /* searching .. */ + for (j = i + 1; j < spl->spl_nleft; j++) + { + if (spl->spl_idgrp[spl->spl_left[j]]) + continue; + FunctionCall3(&giststate->equalFn[0], + valvec[spl->spl_left[i]].key, + valvec[spl->spl_left[j]].key, + PointerGetDatum(&result)); + if (result) + { + spl->spl_idgrp[spl->spl_left[j]] = curid; + len++; + } + } + spl->spl_ngrp[curid] = len + 1; + curid++; + } + } + + return curid; +} + +/* + * Insert equivalent tuples to left or right page with minimum + * penalty + */ +void +gistadjsubkey(Relation r, + IndexTuple *itup, /* contains compressed entry */ + int *len, + GIST_SPLITVEC *v, + GISTSTATE *giststate) +{ + int curlen; + OffsetNumber *curwpos; + GISTENTRY entry, + identry[INDEX_MAX_KEYS], + *ev0p, + *ev1p; + float lpenalty, + rpenalty; + GistEntryVector *evec; + int datumsize; + bool isnull[INDEX_MAX_KEYS]; + int i, + j; + + /* clear vectors */ + curlen = v->spl_nleft; + curwpos = v->spl_left; + for (i = 0; i < v->spl_nleft; i++) + { + if (v->spl_idgrp[v->spl_left[i]] == 0) + { + *curwpos = v->spl_left[i]; + curwpos++; + } + else + curlen--; + } + v->spl_nleft = curlen; + + curlen = v->spl_nright; + curwpos = v->spl_right; + for (i = 0; i < v->spl_nright; i++) + { + if (v->spl_idgrp[v->spl_right[i]] == 0) + { + *curwpos = v->spl_right[i]; + curwpos++; + } + else + curlen--; + } + v->spl_nright = curlen; + + evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); + evec->n = 2; + ev0p = &(evec->vector[0]); + ev1p = &(evec->vector[1]); + + /* add equivalent tuple */ + for (i = 0; i < *len; i++) + { + Datum datum; + + if (v->spl_idgrp[i + 1] == 0) /* already inserted */ + continue; + gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0, + identry, isnull); + + v->spl_ngrp[v->spl_idgrp[i + 1]]--; + if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 && + (v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED) + { + /* force last in group */ + rpenalty = 1.0; + lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0; + } + else + { + /* where? */ + for (j = 1; j < r->rd_att->natts; j++) + { + gistentryinit(entry, v->spl_lattr[j], r, NULL, + (OffsetNumber) 0, v->spl_lattrsize[j], FALSE); + gistpenalty(giststate, j, &entry, v->spl_lisnull[j], + &identry[j], isnull[j], &lpenalty); + + gistentryinit(entry, v->spl_rattr[j], r, NULL, + (OffsetNumber) 0, v->spl_rattrsize[j], FALSE); + gistpenalty(giststate, j, &entry, v->spl_risnull[j], + &identry[j], isnull[j], &rpenalty); + + if (lpenalty != rpenalty) + break; + } + } + + /* + * add + * XXX: refactor this to avoid duplicating code + */ + if (lpenalty < rpenalty) + { + v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED; + v->spl_left[v->spl_nleft] = i + 1; + v->spl_nleft++; + for (j = 1; j < r->rd_att->natts; j++) + { + if (isnull[j] && v->spl_lisnull[j]) + { + v->spl_lattr[j] = (Datum) 0; + v->spl_lattrsize[j] = 0; + } + else + { + FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + v->spl_lattr[j] = datum; + v->spl_lattrsize[j] = datumsize; + v->spl_lisnull[j] = false; + } + } + } + else + { + v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED; + v->spl_right[v->spl_nright] = i + 1; + v->spl_nright++; + for (j = 1; j < r->rd_att->natts; j++) + { + if (isnull[j] && v->spl_risnull[j]) + { + v->spl_rattr[j] = (Datum) 0; + v->spl_rattrsize[j] = 0; + } + else + { + FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j], + isnull[j], identry[j].key, identry[j].bytes); + + datum = FunctionCall2(&giststate->unionFn[j], + PointerGetDatum(evec), + PointerGetDatum(&datumsize)); + + v->spl_rattr[j] = datum; + v->spl_rattrsize[j] = datumsize; + v->spl_risnull[j] = false; + } + } + } + } +} + +/* + * find entry with lowest penalty + */ +OffsetNumber +gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ + GISTSTATE *giststate) +{ + OffsetNumber maxoff; + OffsetNumber i; + OffsetNumber which; + float sum_grow, + which_grow[INDEX_MAX_KEYS]; + GISTENTRY entry, + identry[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + + maxoff = PageGetMaxOffsetNumber(p); + *which_grow = -1.0; + which = -1; + sum_grow = 1; + gistDeCompressAtt(giststate, r, + it, NULL, (OffsetNumber) 0, + identry, isnull); + + for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) + { + int j; + IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); + + sum_grow = 0; + for (j = 0; j < r->rd_att->natts; j++) + { + Datum datum; + float usize; + bool IsNull; + + datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); + gistdentryinit(giststate, j, &entry, datum, r, p, i, + ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), + FALSE, IsNull); + gistpenalty(giststate, j, &entry, IsNull, + &identry[j], isnull[j], &usize); + + if (which_grow[j] < 0 || usize < which_grow[j]) + { + which = i; + which_grow[j] = usize; + if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) + which_grow[j + 1] = -1; + sum_grow += which_grow[j]; + } + else if (which_grow[j] == usize) + sum_grow += usize; + else + { + sum_grow = 1; + break; + } + } + } + + return which; +} + +/* + * initialize a GiST entry with a decompressed version of key + */ +void +gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, + Datum k, Relation r, Page pg, OffsetNumber o, + int b, bool l, bool isNull) +{ + if (b && !isNull) + { + GISTENTRY *dep; + + gistentryinit(*e, k, r, pg, o, b, l); + dep = (GISTENTRY *) + DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], + PointerGetDatum(e))); + /* decompressFn may just return the given pointer */ + if (dep != e) + gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, + dep->bytes, dep->leafkey); + } + else + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); +} + + +/* + * initialize a GiST entry with a compressed version of key + */ +void +gistcentryinit(GISTSTATE *giststate, int nkey, + GISTENTRY *e, Datum k, Relation r, + Page pg, OffsetNumber o, int b, bool l, bool isNull) +{ + if (!isNull) + { + GISTENTRY *cep; + + gistentryinit(*e, k, r, pg, o, b, l); + cep = (GISTENTRY *) + DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], + PointerGetDatum(e))); + /* compressFn may just return the given pointer */ + if (cep != e) + gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, + cep->bytes, cep->leafkey); + } + else + gistentryinit(*e, (Datum) 0, r, pg, o, 0, l); +} + +IndexTuple +gistFormTuple(GISTSTATE *giststate, Relation r, + Datum attdata[], int datumsize[], bool isnull[]) +{ + GISTENTRY centry[INDEX_MAX_KEYS]; + Datum compatt[INDEX_MAX_KEYS]; + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + if (isnull[i]) + compatt[i] = (Datum) 0; + else + { + gistcentryinit(giststate, i, ¢ry[i], attdata[i], + NULL, NULL, (OffsetNumber) 0, + datumsize[i], FALSE, FALSE); + compatt[i] = centry[i].key; + } + } + + return index_form_tuple(giststate->tupdesc, compatt, isnull); +} + +void +gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, + OffsetNumber o, GISTENTRY *attdata, bool *isnull) +{ + int i; + + for (i = 0; i < r->rd_att->natts; i++) + { + Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); + gistdentryinit(giststate, i, &attdata[i], + datum, r, p, o, + ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), + FALSE, isnull[i]); + } +} + +static void +gistpenalty(GISTSTATE *giststate, int attno, + GISTENTRY *key1, bool isNull1, + GISTENTRY *key2, bool isNull2, float *penalty) +{ + if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2)) + *penalty = 0.0; + else + FunctionCall3(&giststate->penaltyFn[attno], + PointerGetDatum(key1), + PointerGetDatum(key2), + PointerGetDatum(penalty)); +} + +void +GISTInitBuffer(Buffer b, uint32 f) +{ + GISTPageOpaque opaque; + Page page; + Size pageSize; + + pageSize = BufferGetPageSize(b); + page = BufferGetPage(b); + PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); + + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + opaque->flags = f; +} + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c new file mode 100644 index 0000000000..b99ab24761 --- /dev/null +++ b/src/backend/access/gist/gistxlog.c @@ -0,0 +1,628 @@ +/*------------------------------------------------------------------------- + * + * gistxlog.c + * WAL replay logic for GiST. + * + * + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $ + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/gist_private.h" +#include "access/gistscan.h" +#include "access/heapam.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "utils/memutils.h" + +typedef struct { + gistxlogEntryUpdate *data; + int len; + IndexTuple *itup; + BlockNumber *path; +} EntryUpdateRecord; + +typedef struct { + gistxlogPage *header; + OffsetNumber *offnum; + + /* to work with */ + Page page; + Buffer buffer; + bool is_ok; +} NewPage; + +typedef struct { + gistxlogPageSplit *data; + NewPage *page; + IndexTuple *itup; + BlockNumber *path; +} PageSplitRecord; + +/* track for incomplete inserts, idea was taken from nbtxlog.c */ + +typedef struct gistIncompleteInsert { + RelFileNode node; + ItemPointerData key; + int lenblk; + BlockNumber *blkno; + int pathlen; + BlockNumber *path; +} gistIncompleteInsert; + + +MemoryContext opCtx; +MemoryContext insertCtx; +static List *incomplete_inserts; + + +#define ItemPointerEQ( a, b ) \ + ( \ + ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \ + ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ + ) + +static void +pushIncompleteInsert(RelFileNode node, ItemPointerData key, + BlockNumber *blkno, int lenblk, + BlockNumber *path, int pathlen, + PageSplitRecord *xlinfo /* to extract blkno info */ ) { + MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); + gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) ); + + ninsert->node = node; + ninsert->key = key; + + if ( lenblk && blkno ) { + ninsert->lenblk = lenblk; + ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); + memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk); + } else { + int i; + + Assert( xlinfo ); + ninsert->lenblk = xlinfo->data->npage; + ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk ); + for(i=0;i<ninsert->lenblk;i++) + ninsert->blkno[i] = xlinfo->page[i].header->blkno; + } + Assert( ninsert->lenblk>0 ); + + if ( path && ninsert->pathlen ) { + ninsert->pathlen = pathlen; + ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen ); + memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen); + } else { + ninsert->pathlen = 0; + ninsert->path = NULL; + } + + incomplete_inserts = lappend(incomplete_inserts, ninsert); + MemoryContextSwitchTo(oldCxt); +} + +static void +forgetIncompleteInsert(RelFileNode node, ItemPointerData key) { + ListCell *l; + + foreach(l, incomplete_inserts) { + gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); + + if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) { + + /* found */ + if ( insert->path ) pfree( insert->path ); + pfree( insert->blkno ); + incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); + pfree( insert ); + break; + } + } +} + +static void +decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + int i=0, addpath=0; + + decoded->data = (gistxlogEntryUpdate*)begin; + + if ( decoded->data->pathlen ) { + addpath = sizeof(BlockNumber) * decoded->data->pathlen; + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + } else + decoded->path = NULL; + + decoded->len=0; + ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; + while( ptr - begin < record->xl_len ) { + decoded->len++; + ptr += IndexTupleSize( (IndexTuple)ptr ); + } + + decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len ); + + ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath; + while( ptr - begin < record->xl_len ) { + decoded->itup[i] = (IndexTuple)ptr; + ptr += IndexTupleSize( decoded->itup[i] ); + i++; + } +} + + +static void +gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { + EntryUpdateRecord xlrec; + Relation reln; + Buffer buffer; + Page page; + + decodeEntryUpdateRecord( &xlrec, record ); + + reln = XLogOpenRelation(xlrec.data->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer(false, reln, xlrec.data->blkno); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoEntryUpdateRecord: block unfound"); + page = (Page) BufferGetPage(buffer); + + if ( isnewroot ) { + if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } else { + if ( PageIsNew((PageHeader) page) ) + elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page"); + if (XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } + + if ( isnewroot ) + GISTInitBuffer(buffer, 0); + else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) + PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + + /* add tuples */ + if ( xlrec.len > 0 ) { + OffsetNumber off = (PageIsEmpty(page)) ? + FirstOffsetNumber + : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off); + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + + if ( ItemPointerIsValid( &(xlrec.data->key) ) ) { + if ( incomplete_inserts != NIL ) + forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); + + if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO ) + pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + &(xlrec.data->blkno), 1, + xlrec.path, xlrec.data->pathlen, + NULL); + } +} + +static void +decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) { + char *begin = XLogRecGetData(record), *ptr; + int i=0, addpath = 0; + + decoded->data = (gistxlogPageSplit*)begin; + decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage ); + decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup ); + + if ( decoded->data->pathlen ) { + addpath = sizeof(BlockNumber) * decoded->data->pathlen; + decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate )); + } else + decoded->path = NULL; + + ptr=begin+sizeof( gistxlogPageSplit ) + addpath; + for(i=0;i<decoded->data->nitup;i++) { + Assert( ptr - begin < record->xl_len ); + decoded->itup[i] = (IndexTuple)ptr; + ptr += IndexTupleSize( decoded->itup[i] ); + } + + for(i=0;i<decoded->data->npage;i++) { + Assert( ptr - begin < record->xl_len ); + decoded->page[i].header = (gistxlogPage*)ptr; + ptr += sizeof(gistxlogPage); + + Assert( ptr - begin < record->xl_len ); + decoded->page[i].offnum = (OffsetNumber*)ptr; + ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num ); + } +} + +static void +gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) { + PageSplitRecord xlrec; + Relation reln; + Buffer buffer; + Page page; + int i, len=0; + IndexTuple *itup, *institup; + GISTPageOpaque opaque; + bool release=true; + + decodePageSplitRecord( &xlrec, record ); + + reln = XLogOpenRelation(xlrec.data->node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoEntryUpdateRecord: block unfound"); + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page"); + + if (XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + + if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber ) + PageIndexTupleDelete(page, xlrec.data->todeleteoffnum); + + itup = gistextractbuffer(buffer, &len); + itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup); + institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len ); + opaque = (GISTPageOpaque) PageGetSpecialPointer(page); + + for(i=0;i<xlrec.data->npage;i++) { + int j; + NewPage *newpage = xlrec.page + i; + + /* prepare itup vector */ + for(j=0;j<newpage->header->num;j++) + institup[j] = itup[ newpage->offnum[j] - 1 ]; + + if ( newpage->header->blkno == xlrec.data->origblkno ) { + /* IncrBufferRefCount(buffer); */ + newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData)); + newpage->buffer = buffer; + newpage->is_ok=false; + } else { + newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno); + if (!BufferIsValid(newpage->buffer)) + elog(PANIC, "gistRedoPageSplitRecord: lost page"); + newpage->page = (Page) BufferGetPage(newpage->buffer); + if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + newpage->is_ok=true; + continue; /* good page */ + } else { + newpage->is_ok=false; + GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF); + } + } + gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber); + } + + for(i=0;i<xlrec.data->npage;i++) { + NewPage *newpage = xlrec.page + i; + + if ( newpage->is_ok ) + continue; + + if ( newpage->header->blkno == xlrec.data->origblkno ) { + PageRestoreTempPage(newpage->page, page); + release = false; + } + + PageSetLSN(newpage->page, lsn); + PageSetTLI(newpage->page, ThisTimeLineID); + LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(newpage->buffer); + } + + if ( release ) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + + if ( ItemPointerIsValid( &(xlrec.data->key) ) ) { + if ( incomplete_inserts != NIL ) + forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); + + pushIncompleteInsert(xlrec.data->node, xlrec.data->key, + NULL, 0, + xlrec.path, xlrec.data->pathlen, + &xlrec); + } +} + +static void +gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) { + RelFileNode *node = (RelFileNode*)XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + + reln = XLogOpenRelation(*node); + if (!RelationIsValid(reln)) + return; + buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistRedoCreateIndex: block unfound"); + page = (Page) BufferGetPage(buffer); + + if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + + GISTInitBuffer(buffer, F_LEAF); + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); +} + +void +gist_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); + switch (info) { + case XLOG_GIST_ENTRY_UPDATE: + case XLOG_GIST_ENTRY_DELETE: + gistRedoEntryUpdateRecord(lsn, record,false); + break; + case XLOG_GIST_NEW_ROOT: + gistRedoEntryUpdateRecord(lsn, record,true); + break; + case XLOG_GIST_PAGE_SPLIT: + gistRedoPageSplitRecord(lsn, record); + break; + case XLOG_GIST_CREATE_INDEX: + gistRedoCreateIndex(lsn, record); + break; + case XLOG_GIST_INSERT_COMPLETE: + forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node, + ((gistxlogInsertComplete*)XLogRecGetData(record))->key ); + break; + default: + elog(PANIC, "gist_redo: unknown op code %u", info); + } + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} + +static void +out_target(char *buf, RelFileNode node, ItemPointerData key) +{ + sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u", + node.spcNode, node.dbNode, node.relNode, + ItemPointerGetBlockNumber(&key), + ItemPointerGetOffsetNumber(&key)); +} + +static void +out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) { + out_target(buf, xlrec->node, xlrec->key); + sprintf(buf + strlen(buf), "; block number %u; update offset %u;", + xlrec->blkno, xlrec->todeleteoffnum); +} + +static void +out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) { + strcat(buf, "page_split: "); + out_target(buf, xlrec->node, xlrec->key); + sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages", + xlrec->origblkno, xlrec->todeleteoffnum, + xlrec->nitup, xlrec->npage); +} + +void +gist_desc(char *buf, uint8 xl_info, char *rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + switch (info) { + case XLOG_GIST_ENTRY_UPDATE: + strcat(buf, "entry_update: "); + out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec); + break; + case XLOG_GIST_ENTRY_DELETE: + strcat(buf, "entry_delete: "); + out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec); + break; + case XLOG_GIST_NEW_ROOT: + strcat(buf, "new_root: "); + out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key); + break; + case XLOG_GIST_PAGE_SPLIT: + out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec); + break; + case XLOG_GIST_CREATE_INDEX: + sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u", + ((RelFileNode*)rec)->spcNode, + ((RelFileNode*)rec)->dbNode, + ((RelFileNode*)rec)->relNode); + break; + case XLOG_GIST_INSERT_COMPLETE: + strcat(buf, "insert_complete: "); + out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key); + default: + elog(PANIC, "gist_desc: unknown op code %u", info); + } +} + + +#ifdef GIST_INCOMPLETE_INSERT +static void +gistContinueInsert(gistIncompleteInsert *insert) { + GISTSTATE giststate; + GISTInsertState state; + int i; + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); + + state.r = XLogOpenRelation(insert->node); + if (!RelationIsValid(state.r)) + return; + + initGISTstate(&giststate, state.r); + + state.needInsertComplete=false; + ItemPointerSetInvalid( &(state.key) ); + state.path=NULL; + state.pathlen=0; + state.xlog_mode = true; + + /* form union tuples */ + state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk); + state.ituplen = insert->lenblk; + for(i=0;i<insert->lenblk;i++) { + int len=0; + IndexTuple *itup; + Buffer buffer; + Page page; + + buffer = XLogReadBuffer(false, state.r, insert->blkno[i]); + if (!BufferIsValid(buffer)) + elog(PANIC, "gistContinueInsert: block unfound"); + page = (Page) BufferGetPage(buffer); + if ( PageIsNew((PageHeader)page) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + itup = gistextractbuffer(buffer, &len); + state.itup[i] = gistunion(state.r, itup, len, &giststate); + + ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber ); + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + } + + if ( insert->pathlen==0 ) { + /*it was split root, so we should only make new root*/ + gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true); + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); + return; + } + + /* form stack */ + state.stack=NULL; + for(i=0;i<insert->pathlen;i++) { + int j,len=0; + IndexTuple *itup; + GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) ); + + top->blkno = insert->path[i]; + top->buffer = XLogReadBuffer(false, state.r, top->blkno); + if (!BufferIsValid(top->buffer)) + elog(PANIC, "gistContinueInsert: block unfound"); + top->page = (Page) BufferGetPage(top->buffer); + if ( PageIsNew((PageHeader)(top->page)) ) + elog(PANIC, "gistContinueInsert: uninitialized page"); + + top->todelete = false; + + /* find childoffnum */ + itup = gistextractbuffer(top->buffer, &len); + top->childoffnum=InvalidOffsetNumber; + for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) { + BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) ); + + if ( i==0 ) { + int k; + for(k=0;k<insert->lenblk;k++) + if ( insert->blkno[k] == blkno ) { + top->childoffnum = j+1; + break; + } + } else if ( insert->path[i-1]==blkno ) + top->childoffnum = j+1; + } + + if ( top->childoffnum==InvalidOffsetNumber ) { + elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes"); + return; + } + + if ( i==0 ) + PageIndexTupleDelete(top->page, top->childoffnum); + + /* install item on right place in stack */ + top->parent=NULL; + if ( state.stack ) { + GISTInsertStack *ptr = state.stack; + while( ptr->parent ) + ptr = ptr->parent; + ptr->parent=top; + } else + state.stack = top; + } + + /* Good. Now we can continue insert */ + + gistmakedeal(&state, &giststate); + + MemoryContextSwitchTo(oldCxt); + MemoryContextReset(opCtx); +} +#endif + +void +gist_xlog_startup(void) { + incomplete_inserts=NIL; + insertCtx = AllocSetContextCreate(CurrentMemoryContext, + "GiST insert in xlog temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + opCtx = createTempGistContext(); +} + +void +gist_xlog_cleanup(void) { + ListCell *l; + + foreach(l, incomplete_inserts) { + gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l); + char buf[1024]; + + *buf='\0'; + out_target(buf, insert->node, insert->key); + elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf); +#ifdef GIST_INCOMPLETE_INSERT + gistContinueInsert(insert); +#endif + } + MemoryContextDelete(opCtx); + MemoryContextDelete(insertCtx); +} + |