diff options
Diffstat (limited to 'storage/ndb/src')
-rw-r--r-- | storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp | 315 | ||||
-rw-r--r-- | storage/ndb/src/kernel/vm/SuperPool.cpp | 442 | ||||
-rw-r--r-- | storage/ndb/src/kernel/vm/SuperPool.hpp | 561 | ||||
-rw-r--r-- | storage/ndb/src/kernel/vm/testSuperPool.cpp | 220 |
4 files changed, 1538 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp new file mode 100644 index 00000000000..396404faa8c --- /dev/null +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp @@ -0,0 +1,315 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#define DBTUP_C +#include "Dbtup.hpp" +#include <signaldata/AccScan.hpp> +#include <signaldata/NextScan.hpp> + +#undef jam +#undef jamEntry +#define jam() { jamLine(32000 + __LINE__); } +#define jamEntry() { jamEntryLine(32000 + __LINE__); } + +void +Dbtup::execACC_SCANREQ(Signal* signal) +{ + jamEntry(); + const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr(); + const AccScanReq* const req = &reqCopy; + ScanOpPtr scanPtr; + scanPtr.i = RNIL; + do { + // find table and fragments + TablerecPtr tablePtr; + tablePtr.i = req->tableId; + ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); + FragrecordPtr fragPtr[2]; + Uint32 fragId = req->fragmentNo << 1; + fragPtr[0].i = fragPtr[1].i = RNIL; + getFragmentrec(fragPtr[0], fragId | 0, tablePtr.p); + getFragmentrec(fragPtr[1], fragId | 1, tablePtr.p); + ndbrequire(fragPtr[0].i != RNIL && fragPtr[1].i != RNIL); + Fragrecord& frag = *fragPtr[0].p; + // seize from pool and link to per-fragment list + if (! frag.m_scanList.seize(scanPtr)) { + jam(); + break; + } + new (scanPtr.p) ScanOp(); + ScanOp& scan = *scanPtr.p; + scan.m_state = ScanOp::First; + scan.m_userPtr = req->senderData; + scan.m_userRef = req->senderRef; + scan.m_tableId = tablePtr.i; + scan.m_fragId = frag.fragmentId; + scan.m_fragPtrI[0] = fragPtr[0].i; + scan.m_fragPtrI[1] = fragPtr[1].i; + scan.m_transId1 = req->transId1; + scan.m_transId2 = req->transId2; + // conf + AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend(); + conf->scanPtr = req->senderData; + conf->accPtr = scanPtr.i; + conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT; + sendSignal(req->senderRef, GSN_ACC_SCANCONF, signal, + AccScanConf::SignalLength, JBB); + return; + } while (0); + if (scanPtr.i != RNIL) { + jam(); + releaseScanOp(scanPtr); + } + // LQH does not handle REF + signal->theData[0] = 0x313; + sendSignal(req->senderRef, GSN_ACC_SCANREF, signal, 1, JBB); +} + +void +Dbtup::execNEXT_SCANREQ(Signal* signal) +{ + jamEntry(); + const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr(); + const NextScanReq* const req = &reqCopy; + ScanOpPtr scanPtr; + c_scanOpPool.getPtr(scanPtr, req->accPtr); + ScanOp& scan = *scanPtr.p; + FragrecordPtr fragPtr; + fragPtr.i = scan.m_fragPtrI[0]; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + Fragrecord& frag = *fragPtr.p; + switch (req->scanFlag) { + case NextScanReq::ZSCAN_NEXT: + jam(); + break; + case NextScanReq::ZSCAN_NEXT_COMMIT: + jam(); + break; + case NextScanReq::ZSCAN_COMMIT: + jam(); + { + NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); + conf->scanPtr = scan.m_userPtr; + unsigned signalLength = 1; + sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, + signal, signalLength, JBB); + return; + } + break; + case NextScanReq::ZSCAN_CLOSE: + jam(); + scanClose(signal, scanPtr); + return; + case NextScanReq::ZSCAN_NEXT_ABORT: + jam(); + default: + jam(); + ndbrequire(false); + break; + } + // start looking for next scan result + AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend(); + checkReq->accPtr = scanPtr.i; + checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP; + EXECUTE_DIRECT(DBTUP, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength); + jamEntry(); +} + +void +Dbtup::execACC_CHECK_SCAN(Signal* signal) +{ + jamEntry(); + const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr(); + const AccCheckScan* const req = &reqCopy; + ScanOpPtr scanPtr; + c_scanOpPool.getPtr(scanPtr, req->accPtr); + ScanOp& scan = *scanPtr.p; + FragrecordPtr fragPtr; + fragPtr.i = scan.m_fragPtrI[0]; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + Fragrecord& frag = *fragPtr.p; + if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) { + jam(); + signal->theData[0] = scan.m_userPtr; + signal->theData[1] = true; + EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); + jamEntry(); + return; + } + if (scan.m_state == ScanOp::First) { + jam(); + scanFirst(signal, scanPtr); + } + if (scan.m_state == ScanOp::Next) { + jam(); + scanNext(signal, scanPtr); + } + if (scan.m_state == ScanOp::Locked) { + jam(); + const PagePos& pos = scan.m_scanPos; + NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); + conf->scanPtr = scan.m_userPtr; + conf->accOperationPtr = (Uint32)-1; // no lock returned + conf->fragId = frag.fragmentId | pos.m_fragBit; + conf->localKey[0] = (pos.m_pageId << MAX_TUPLES_BITS) | + (pos.m_tupleNo << 1); + conf->localKey[1] = 0; + conf->localKeyLength = 1; + unsigned signalLength = 6; + Uint32 blockNo = refToBlock(scan.m_userRef); + EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength); + jamEntry(); + // next time look for next entry + scan.m_state = ScanOp::Next; + return; + } + if (scan.m_state == ScanOp::Last || + scan.m_state == ScanOp::Invalid) { + jam(); + NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); + conf->scanPtr = scan.m_userPtr; + conf->accOperationPtr = RNIL; + conf->fragId = RNIL; + unsigned signalLength = 3; + sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, + signal, signalLength, JBB); + return; + } + ndbrequire(false); +} + +void +Dbtup::scanFirst(Signal* signal, ScanOpPtr scanPtr) +{ + ScanOp& scan = *scanPtr.p; + // set to first fragment, first page, first tuple + PagePos& pos = scan.m_scanPos; + pos.m_fragId = scan.m_fragId; + pos.m_fragBit = 0; + pos.m_pageId = 0; + pos.m_tupleNo = 0; + // just before + pos.m_match = false; + // let scanNext() do the work + scan.m_state = ScanOp::Next; +} + +// TODO optimize this + index build +void +Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) +{ + ScanOp& scan = *scanPtr.p; + PagePos& pos = scan.m_scanPos; + TablerecPtr tablePtr; + tablePtr.i = scan.m_tableId; + ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec); + while (true) { + // TODO time-slice here after X loops + jam(); + // get fragment + if (pos.m_fragBit == 2) { + jam(); + scan.m_state = ScanOp::Last; + break; + } + ndbrequire(pos.m_fragBit <= 1); + FragrecordPtr fragPtr; + fragPtr.i = scan.m_fragPtrI[pos.m_fragBit]; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + Fragrecord& frag = *fragPtr.p; + // get page + PagePtr pagePtr; + if (pos.m_pageId >= frag.noOfPages) { + jam(); + pos.m_fragBit++; + pos.m_pageId = 0; + pos.m_tupleNo = 0; + pos.m_match = false; + continue; + } + Uint32 realPageId = getRealpid(fragPtr.p, pos.m_pageId); + pagePtr.i = realPageId; + ptrCheckGuard(pagePtr, cnoOfPage, page); + const Uint32 pageState = pagePtr.p->pageWord[ZPAGE_STATE_POS]; + if (pageState != ZTH_MM_FREE && + pageState != ZTH_MM_FULL) { + jam(); + pos.m_pageId++; + pos.m_tupleNo = 0; + pos.m_match = false; + continue; + } + // get next tuple + if (pos.m_match) + pos.m_tupleNo++; + pos.m_match = true; + const Uint32 tupheadsize = tablePtr.p->tupheadsize; + Uint32 pageOffset = ZPAGE_HEADER_SIZE + pos.m_tupleNo * tupheadsize; + if (pageOffset + tupheadsize > ZWORDS_ON_PAGE) { + jam(); + pos.m_pageId++; + pos.m_tupleNo = 0; + pos.m_match = false; + continue; + } + // skip over free tuple + bool isFree = false; + if (pageState == ZTH_MM_FREE) { + jam(); + if ((pagePtr.p->pageWord[pageOffset] >> 16) == tupheadsize) { + Uint32 nextTuple = pagePtr.p->pageWord[ZFREELIST_HEADER_POS] >> 16; + while (nextTuple != 0) { + jam(); + if (nextTuple == pageOffset) { + jam(); + isFree = true; + break; + } + nextTuple = pagePtr.p->pageWord[nextTuple] & 0xffff; + } + } + } + if (isFree) { + jam(); + continue; + } + // TODO check for operation and return latest in own tx + scan.m_state = ScanOp::Locked; + break; + } +} + +void +Dbtup::scanClose(Signal* signal, ScanOpPtr scanPtr) +{ + NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); + conf->scanPtr = scanPtr.p->m_userPtr; + conf->accOperationPtr = RNIL; + conf->fragId = RNIL; + unsigned signalLength = 3; + sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, + signal, signalLength, JBB); + releaseScanOp(scanPtr); +} + +void +Dbtup::releaseScanOp(ScanOpPtr& scanPtr) +{ + FragrecordPtr fragPtr; + fragPtr.i = scanPtr.p->m_fragPtrI[0]; + ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord); + fragPtr.p->m_scanList.release(scanPtr); +} diff --git a/storage/ndb/src/kernel/vm/SuperPool.cpp b/storage/ndb/src/kernel/vm/SuperPool.cpp new file mode 100644 index 00000000000..65e5dd99629 --- /dev/null +++ b/storage/ndb/src/kernel/vm/SuperPool.cpp @@ -0,0 +1,442 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> +#include "SuperPool.hpp" + +// SuperPool + +SuperPool::SuperPool(Uint32 pageSize, Uint32 pageBits) : + m_pageSize(SP_ALIGN_SIZE(pageSize, SP_ALIGN)), + m_pageBits(pageBits), + m_memRoot(0), + m_pageEnt(0), + m_typeCheck(0), + m_typeSeq(0), + m_pageList(), + m_totalSize(0), + m_initSize(0), + m_incrSize(0), + m_maxSize(0) +{ + assert(5 <= pageBits <= 30); +} + +bool +SuperPool::init() +{ + return true; +} + +SuperPool::~SuperPool() +{ +} + +SuperPool::PageEnt::PageEnt() : + m_pageType(0), + m_freeRecI(RNIL), + m_useCount(0), + m_nextPageI(RNIL), + m_prevPageI(RNIL) +{ +} + +SuperPool::PageList::PageList() : + m_headPageI(RNIL), + m_tailPageI(RNIL), + m_pageCount(0) +{ +} + +SuperPool::PageList::PageList(PtrI pageI) : + m_headPageI(pageI), + m_tailPageI(pageI), + m_pageCount(1) +{ +} + +SuperPool::RecInfo::RecInfo(Uint32 recType, Uint32 recSize) : + m_recType(recType), + m_recSize(recSize), + m_maxUseCount(0), + m_currPageI(RNIL), + m_currFreeRecI(RNIL), + m_currUseCount(0), + m_totalUseCount(0), + m_totalRecCount(0), + m_freeList(), + m_activeList(), + m_fullList() +{ +} + +SuperPool::PtrI +SuperPool::getPageI(void* pageP) +{ + const Uint32 pageSize = m_pageSize; + const Uint32 pageBits = m_pageBits; + const Uint32 recBits = 32 - pageBits; + void* const memRoot = m_memRoot; + assert(pageP == SP_ALIGN_PTR(pageP, memRoot, pageSize)); + my_ptrdiff_t ipL = ((Uint8*)pageP - (Uint8*)memRoot) / pageSize; + Int32 ip = (Int32)ipL; + Int32 lim = 1 << (pageBits - 1); + assert(ip == ipL && -lim <= ip && ip < lim && ip != -1); + PtrI pageI = ip << recBits; + assert(pageP == getPageP(pageI)); + return pageI; +} + +void +SuperPool::movePages(PageList& pl1, PageList& pl2) +{ + const Uint32 recBits = 32 - m_pageBits; + if (pl1.m_pageCount != 0) { + if (pl2.m_pageCount != 0) { + PtrI pageI1 = pl1.m_tailPageI; + PtrI pageI2 = pl2.m_headPageI; + PageEnt& pe1 = getPageEnt(pageI1); + PageEnt& pe2 = getPageEnt(pageI2); + pe1.m_nextPageI = pageI2; + pe2.m_prevPageI = pageI1; + pl1.m_pageCount += pl2.m_pageCount; + } + } else { + pl1 = pl2; + } + pl2.m_headPageI = pl2.m_tailPageI = RNIL; + pl2.m_pageCount = 0; +} + +void +SuperPool::addHeadPage(PageList& pl, PtrI pageI) +{ + PageList pl2(pageI); + movePages(pl2, pl); + pl = pl2; +} + +void +SuperPool::addTailPage(PageList& pl, PtrI pageI) +{ + PageList pl2(pageI); + movePages(pl, pl2); +} + +void +SuperPool::removePage(PageList& pl, PtrI pageI) +{ + PageEnt& pe = getPageEnt(pageI); + PtrI pageI1 = pe.m_prevPageI; + PtrI pageI2 = pe.m_nextPageI; + if (pageI1 != RNIL) { + PageEnt& pe1 = getPageEnt(pageI1); + pe1.m_nextPageI = pageI2; + if (pageI2 != RNIL) { + PageEnt& pe2 = getPageEnt(pageI2); + pe2.m_prevPageI = pageI1; + } else { + pl.m_tailPageI = pageI1; + } + } else { + if (pageI2 != RNIL) { + PageEnt& pe2 = getPageEnt(pageI2); + pe2.m_prevPageI = pageI1; + pl.m_headPageI = pageI2; + } else { + pl.m_headPageI = pl.m_tailPageI = RNIL; + } + } + pe.m_prevPageI = pe.m_nextPageI = RNIL; + assert(pl.m_pageCount != 0); + pl.m_pageCount--; +} + +void +SuperPool::setCurrPage(RecInfo& ri, PtrI newPageI) +{ + PtrI oldPageI = ri.m_currPageI; + if (oldPageI != RNIL) { + // copy from cache + PageEnt& pe = getPageEnt(oldPageI); + pe.m_freeRecI = ri.m_currFreeRecI; + pe.m_useCount = ri.m_currUseCount; + // add to right list according to "pp2" policy + if (pe.m_useCount == 0) { + pe.m_pageType = 0; + addHeadPage(m_pageList, oldPageI); + ri.m_totalRecCount -= ri.m_maxUseCount; + } else if (pe.m_useCount < ri.m_maxUseCount) { + addHeadPage(ri.m_activeList, oldPageI); + } else { + addHeadPage(ri.m_fullList, oldPageI); + } + } + if (newPageI != RNIL) { + PageEnt& pe = getPageEnt(newPageI); + // copy to cache + ri.m_currPageI = newPageI; + ri.m_currFreeRecI = pe.m_freeRecI; + ri.m_currUseCount = pe.m_useCount; + // remove from right list + if (pe.m_useCount == 0) { + removePage(ri.m_freeList, newPageI); + } else if (pe.m_useCount < ri.m_maxUseCount) { + removePage(ri.m_activeList, newPageI); + } else { + removePage(ri.m_fullList, newPageI); + } + } else { + ri.m_currPageI = RNIL; + ri.m_currFreeRecI = RNIL; + ri.m_currUseCount = 0; + } +} + +bool +SuperPool::getAvailPage(RecInfo& ri) +{ + PtrI pageI; + if ((pageI = ri.m_activeList.m_headPageI) != RNIL || + (pageI = ri.m_freeList.m_headPageI) != RNIL || + (pageI = getFreePage(ri)) != RNIL) { + setCurrPage(ri, pageI); + return true; + } + return false; +} + +SuperPool::PtrI +SuperPool::getFreePage(RecInfo& ri) +{ + PtrI pageI; + if (m_pageList.m_pageCount != 0) { + pageI = m_pageList.m_headPageI; + removePage(m_pageList, pageI); + } else { + pageI = getNewPage(); + if (pageI == RNIL) + return RNIL; + } + void* pageP = getPageP(pageI); + // set up free record list + Uint32 maxUseCount = ri.m_maxUseCount; + Uint32 recSize = ri.m_recSize; + void* recP = (Uint8*)pageP; + Uint32 irNext = 1; + while (irNext < maxUseCount) { + *(Uint32*)recP = pageI | irNext; + recP = (Uint8*)recP + recSize; + irNext++; + } + *(Uint32*)recP = RNIL; + // add to total record count + ri.m_totalRecCount += maxUseCount; + // set up new page entry + PageEnt& pe = getPageEnt(pageI); + new (&pe) PageEnt(); + pe.m_pageType = ri.m_recType; + pe.m_freeRecI = pageI | 0; + pe.m_useCount = 0; + // set type check bits + setCheckBits(pageI, ri.m_recType); + // add to record pool free list + addHeadPage(ri.m_freeList, pageI); + return pageI; +} + +void +SuperPool::setSizes(size_t initSize, size_t incrSize, size_t maxSize) +{ + const Uint32 pageSize = m_pageSize; + m_initSize = SP_ALIGN_SIZE(initSize, pageSize); + m_incrSize = SP_ALIGN_SIZE(incrSize, pageSize); + m_maxSize = SP_ALIGN_SIZE(maxSize, pageSize); +} + +void +SuperPool::verify(RecInfo& ri) +{ + PageList* plList[3] = { &ri.m_freeList, &ri.m_activeList, &ri.m_fullList }; + for (int i = 0; i < 3; i++) { + PageList& pl = *plList[i]; + unsigned count = 0; + PtrI pageI = pl.m_headPageI; + while (pageI != RNIL) { + PageEnt& pe = getPageEnt(pageI); + PtrI pageI1 = pe.m_prevPageI; + PtrI pageI2 = pe.m_nextPageI; + if (count == 0) { + assert(pageI1 == RNIL); + } else { + assert(pageI1 != RNIL); + PageEnt& pe1 = getPageEnt(pageI1); + assert(pe1.m_nextPageI == pageI); + if (pageI2 != RNIL) { + PageEnt& pe2 = getPageEnt(pageI2); + assert(pe2.m_prevPageI == pageI); + } + } + pageI = pageI2; + count++; + } + assert(pl.m_pageCount == count); + } +} + +// HeapPool + +HeapPool::HeapPool(Uint32 pageSize, Uint32 pageBits) : + SuperPool(pageSize, pageBits), + m_areaHead(), + m_currArea(&m_areaHead), + m_lastArea(&m_areaHead), + m_mallocPart(4) +{ +} + +bool +HeapPool::init() +{ + const Uint32 pageBits = m_pageBits; + if (! SuperPool::init()) + return false;; + // allocate page entry array + Uint32 peBytes = (1 << pageBits) * sizeof(PageEnt); + m_pageEnt = static_cast<PageEnt*>(malloc(peBytes)); + if (m_pageEnt == 0) + return false; + memset(m_pageEnt, 0, peBytes); + // allocate type check array + Uint32 tcWords = 1 << (pageBits - (5 - SP_CHECK_LOG2)); + m_typeCheck = static_cast<Uint32*>(malloc(tcWords << 2)); + if (m_typeCheck == 0) + return false; + memset(m_typeCheck, 0, tcWords << 2); + // allocate initial data + assert(m_totalSize == 0); + if (! allocMoreData(m_initSize)) + return false; + return true; +} + +HeapPool::~HeapPool() +{ + free(m_pageEnt); + free(m_typeCheck); + Area* ap; + while ((ap = m_areaHead.m_nextArea) != 0) { + m_areaHead.m_nextArea = ap->m_nextArea; + free(ap->m_memory); + free(ap); + } +} + +HeapPool::Area::Area() : + m_nextArea(0), + m_firstPageI(RNIL), + m_currPage(0), + m_numPages(0), + m_memory(0) +{ +} + +SuperPool::PtrI +HeapPool::getNewPage() +{ + const Uint32 pageSize = m_pageSize; + const Uint32 pageBits = m_pageBits; + const Uint32 recBits= 32 - pageBits; + Area* ap = m_currArea; + if (ap->m_currPage == ap->m_numPages) { + // area is used up + if (ap->m_nextArea == 0) { + // todo dynamic increase + assert(m_incrSize == 0); + return RNIL; + } + ap = m_currArea = ap->m_nextArea; + } + assert(ap->m_currPage < ap->m_numPages); + PtrI pageI = ap->m_firstPageI; + Int32 ip = (Int32)pageI >> recBits; + ip += ap->m_currPage; + pageI = ip << recBits; + ap->m_currPage++; + return pageI; +} + +bool +HeapPool::allocMoreData(size_t size) +{ + const Uint32 pageSize = m_pageSize; + const Uint32 pageBits = m_pageBits; + const Uint32 recBits = 32 - pageBits; + const Uint32 incrSize = m_incrSize; + const Uint32 incrPages = incrSize / pageSize; + const Uint32 mallocPart = m_mallocPart; + size = SP_ALIGN_SIZE(size, pageSize); + if (incrSize != 0) + size = SP_ALIGN_SIZE(size, incrSize); + Uint32 needPages = size / pageSize; + while (needPages != 0) { + Uint32 wantPages = needPages; + if (incrPages != 0 && wantPages > incrPages) + wantPages = incrPages; + Uint32 tryPages = 0; + void* p1 = 0; + for (Uint32 i = mallocPart; i > 0 && p1 == 0; i--) { + // one page is usually wasted due to alignment to memory root + tryPages = ((wantPages + 1) * i) / mallocPart; + if (tryPages < 2) + break; + p1 = malloc(pageSize * tryPages); + } + if (p1 == 0) + return false; + if (m_memRoot == 0) { + // set memory root at first "big" alloc + // assume malloc header makes later ip = -1 impossible + m_memRoot = p1; + } + void* p2 = SP_ALIGN_PTR(p1, m_memRoot, pageSize); + Uint32 numPages = tryPages - (p1 != p2); + my_ptrdiff_t ipL = ((Uint8*)p2 - (Uint8*)m_memRoot) / pageSize; + Int32 ip = (Int32)ipL; + Int32 lim = 1 << (pageBits - 1); + if (! (ip == ipL && -lim <= ip && ip + numPages < lim)) { + free(p1); + return false; + } + assert(ip != -1); + PtrI pageI = ip << recBits; + needPages = (needPages >= numPages ? needPages - numPages : 0); + m_totalSize += numPages * pageSize; + // allocate new area + Area* ap = static_cast<Area*>(malloc(sizeof(Area))); + if (ap == 0) { + free(p1); + return false; + } + new (ap) Area(); + ap->m_firstPageI = pageI; + ap->m_numPages = numPages; + ap->m_memory = p1; + m_lastArea->m_nextArea = ap; + m_lastArea = ap; + } + return true; +} diff --git a/storage/ndb/src/kernel/vm/SuperPool.hpp b/storage/ndb/src/kernel/vm/SuperPool.hpp new file mode 100644 index 00000000000..157c75aa0d5 --- /dev/null +++ b/storage/ndb/src/kernel/vm/SuperPool.hpp @@ -0,0 +1,561 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef SUPER_POOL_HPP +#define SUPER_POOL_HPP + +#include <ndb_global.h> + +#include <pc.hpp> +#include <ErrorReporter.hpp> + +#define NDB_SP_VERIFY_LEVEL 1 + +/* + * SuperPool - super pool for record pools (abstract class) + * + * Documents SuperPool and RecordPool<T>. + * + * GENERAL + * + * A "super pool" is a shared pool of pages of fixed size. A "record + * pool" is a pool of records of fixed size. One super pool instance is + * used by any number of record pools to allocate their memory. + * A special case is a "page pool" where a record is a simple page, + * possibly smaller than super pool page. + * + * A record pool allocates memory in pages. Thus each used page is + * associated with one record pool and one record type. The records on + * a page form an array starting at start of page. Thus each record has + * an index within the page. Any last partial record which does not fit + * on the page is disregarded. + * + * I-VALUE + * + * The old "i-p" principle is kept. A reference to a super pool page or + * record is stored as an "i-value" from which the record pointer "p" is + * computed. In super pool the i-value is a Uint32 with two parts: + * + * - "ip" index of page within super pool (high pageBits) + * - "ir" index of record within page (low recBits) + * + * The translation between "ip" and page address is described in next + * section. Once page address is known, the record address is found + * from "ir" in the obvious way. + * + * The main advantage with i-value is that it can be verified. The + * level of verification depends on compile type (release, debug). + * + * - "v0" minimal sanity check + * - "v1" check record type matches page type, see below + * - "v2" check record is in use (not yet implemented) + * + * Another advantage of a 32-bit i-value is that it extends the space of + * 32-bit addressable records on a 64-bit platform. + * + * RNIL is 0xffffff00 and indicates NULL i-value. To avoid hitting RNIL + * it is required that pageBits <= 30 and that the maximum value of the + * range (2^pageBits-1) is not used. + * + * MEMORY ROOT + * + * This super pool requires a "memory root" i.e. a memory address such + * that the index of a page "ip" satisfies + * + * page address = memory root + (signed)ip * page size + * + * This is possible on most platforms, provided that the memory root and + * all pages are either on the heap or on the stack, in order to keep + * the size of "ip" reasonably small. + * + * The cast (signed)ip is done as integer of pageBits bits. "ip" has + * same sign bit as i-value "i" so (signed)ip = (Int32)i >> recBits. + * The RNIL restriction can be expressed as (signed)ip != -1. + * + * PAGE ENTRIES + * + * Each super pool page has a "page entry". It contains: + * + * - page type + * - i-value of first free record on page + * - page use count, to see if page can be freed + * - pointers (as i-values) to next and previous page in list + * + * Page entry cannot be stored on the page itself since this prevents + * aligning pages to OS block size and the use of BATs (don't ask) for + * page pools in NDB. For now the implementation provides an array of + * page entries with place for all (2^pageBits) entries. + * + * PAGE TYPE + * + * Page type is (in principle) unique to the record pool using the super + * pool. It is assigned in record pool constructor. Page type zero + * means that the page is free i.e. not allocated to a record pool. + * + * Each "i-p" conversion checks ("v1") that the record belongs to same + * pool as the page. This check is much more common than page or record + * allocation. To make it cache effective, there is a separate array of + * reduced "type bits" (computed from real type). + * + * FREE LISTS + * + * A record is either used or on the free list of the record pool. + * A page has a use count i.e. number of used records. When use count + * drops to zero the page can be returned to the super pool. This is + * not necessarily done at once, or ever. + * + * To make freeing pages feasible, the record pool free list has two + * levels. There are available pages (some free) and a singly linked + * free list within the page. A page allocated to record pool is on one + * of 4 lists: + * + * - free page list (all free, available) + * - active page list (some free, some used, available) + * - full page list (none free) + * - current page (list of 1), see below + * + * Some usage types (temporary pools) may never free records. They pay + * a small penalty for the extra overhead. + * + * RECORD POOL + * + * A pool of records which allocates its memory from a super pool + * instance specified in the constructor. There are 3 basic operations: + * + * - getPtr - translate i-value to pointer-to-record p + * - seize - allocate record + * - release - free record + * + * CURRENT PAGE + * + * getPtr is a fast computation which does not touch the page. For + * seize and release there is an optimization: + * + * Define "current page" as page of latest seize or release. Its page + * entry is cached under record pool instance. The page is removed from + * its normal list. Seize and release on current page are fast and + * avoid touching the page. The current page is used until + * + * - seize and current page is full + * - release and the page is not current page + * + * Then the real page entry is updated and the page is added to the + * appropriate list, and a new page is made current. + * + * PAGE POLICY + * + * Allocating new page to record pool is expensive. Therefore record + * pool should not always return empty pages to super pool. There are + * two trivial policies, each with problems: + * + * - "pp1" never return empty page to super pool + * - "pp2" always return empty page to super pool + * + * This implementation uses "pp2" for now. A real policy is implemented + * in next version. + * + * OPEN ISSUES AND LIMITATIONS + * + * - smarter (virtual) placement of check bits & page entries + * - should getPtr etc be inlined? (too much code) + * - real page policy + * - other implementations (only HeapPool is done) + * - super pool list of all record pools, for statistics etc + * - access by multiple threads is not supported + */ + +// align size +#define SP_ALIGN_SIZE(sz, al) \ + (((sz) + (al) - 1) & ~((al) - 1)) + +// align pointer relative to base +#define SP_ALIGN_PTR(p, base, al) \ + (void*)((Uint8*)(base) + SP_ALIGN_SIZE((Uint8*)(p) - (Uint8*)(base), (al))) + +class SuperPool { +public: + // Type of i-value, used to reference both pages and records. Page + // index "ip" occupies the high bits. The i-value of a page is same + // as i-value of record 0 on the page. + typedef Uint32 PtrI; + + // Size and address alignment given as number of bytes (power of 2). + STATIC_CONST( SP_ALIGN = 8 ); + + // Page entry. Current|y allocated as array of (2^pageBits). + struct PageEnt { + PageEnt(); + Uint32 m_pageType; + Uint32 m_freeRecI; + Uint32 m_useCount; + PtrI m_nextPageI; + PtrI m_prevPageI; + }; + + // Number of bits for cache effective type check given as log of 2. + // Example: 2 means 4 bits and uses 32k for 2g of 32k pages. + STATIC_CONST( SP_CHECK_LOG2 = 2 ); + + // Doubly-linked list of pages. There is one free list in super pool + // and free, active, full list in each record pool. + struct PageList { + PageList(); + PageList(PtrI pageI); + PtrI m_headPageI; + PtrI m_tailPageI; + Uint32 m_pageCount; + }; + + // Record pool information. Each record pool instance contains one. + struct RecInfo { + RecInfo(Uint32 recType, Uint32 recSize); + const Uint32 m_recType; + const Uint32 m_recSize; + Uint32 m_maxUseCount; // could be computed + Uint32 m_currPageI; // current page + Uint32 m_currFreeRecI; + Uint32 m_currUseCount; + Uint32 m_totalUseCount; // total per pool + Uint32 m_totalRecCount; + PageList m_freeList; + PageList m_activeList; + PageList m_fullList; + }; + + // Constructor. Gives page size in bytes (excluding page header) and + // number of bits to use for page index "ip" in i-value. + SuperPool(Uint32 pageSize, Uint32 pageBits); + + // Initialize. Must be called after setting sizes or other parameters + // and before the pool is used. + virtual bool init(); + + // Destructor. + virtual ~SuperPool() = 0; + + // Translate i-value to page entry. + PageEnt& getPageEnt(PtrI pageI); + + // Translate i-value to page address. + void* getPageP(PtrI pageI); + + // Translate page address to i-value (unused). + PtrI getPageI(void* pageP); + + // Given type, return non-zero reduced type check bits. + Uint32 makeCheckBits(Uint32 type); + + // Get type check bits from type check array. + Uint32 getCheckBits(PtrI pageI); + + // Set type check bits in type check array. + void setCheckBits(PtrI pageI, Uint32 type); + + // Translate i-value to record address. + void* getRecP(PtrI recI, RecInfo& ri); + + // Move all pages from second list to end of first list. + void movePages(PageList& pl1, PageList& pl2); + + // Add page to beginning of page list. + void addHeadPage(PageList& pl, PtrI pageI); + + // Add page to end of page list. + void addTailPage(PageList& pl, PtrI pageI); + + // Remove any page from page list. + void removePage(PageList& pl, PtrI pageI); + + // Set current page. Previous current page is updated and added to + // appropriate list. + void setCurrPage(RecInfo& ri, PtrI pageI); + + // Get page with some free records and make it current. Takes head of + // active or free list, or else gets free page from super pool. + bool getAvailPage(RecInfo& ri); + + // Get free page from super pool and add it to record pool free list. + // This is an expensive subroutine of getAvailPage(). + PtrI getFreePage(RecInfo& ri); + + // Get new free page from the implementation. + virtual PtrI getNewPage() = 0; + + // Set 3 size parameters, rounded to page size. If called before + // init() then init() allocates the initial size. + void setSizes(size_t initSize = 0, size_t incrSize = 0, size_t maxSize = 0); + + const Uint32 m_pageSize; + const Uint32 m_pageBits; + // implementation must set up these pointers + void* m_memRoot; + PageEnt* m_pageEnt; + Uint32* m_typeCheck; + Uint32 m_typeSeq; + PageList m_pageList; + size_t m_totalSize; + size_t m_initSize; + size_t m_incrSize; + size_t m_maxSize; + + // Debugging. + void verify(RecInfo& ri); +}; + +inline SuperPool::PageEnt& +SuperPool::getPageEnt(PtrI pageI) +{ + Uint32 ip = pageI >> (32 - m_pageBits); + return m_pageEnt[ip]; +} + +inline void* +SuperPool::getPageP(PtrI ptrI) +{ + Int32 ip = (Int32)ptrI >> (32 - m_pageBits); + my_ptrdiff_t sz = m_pageSize; + void* pageP = (Uint8*)m_memRoot + ip * sz; + return pageP; +} + +inline Uint32 +SuperPool::makeCheckBits(Uint32 type) +{ + Uint32 shift = 1 << SP_CHECK_LOG2; + Uint32 mask = (1 << shift) - 1; + return 1 + type % mask; +} + +inline Uint32 +SuperPool::getCheckBits(PtrI pageI) +{ + Uint32 ip = pageI >> (32 - m_pageBits); + Uint32 xp = ip >> (5 - SP_CHECK_LOG2); + Uint32 yp = ip & (1 << (5 - SP_CHECK_LOG2)) - 1; + Uint32& w = m_typeCheck[xp]; + Uint32 shift = 1 << SP_CHECK_LOG2; + Uint32 mask = (1 << shift) - 1; + // get + Uint32 bits = (w >> yp * shift) & mask; + return bits; +} + +inline void +SuperPool::setCheckBits(PtrI pageI, Uint32 type) +{ + Uint32 ip = pageI >> (32 - m_pageBits); + Uint32 xp = ip >> (5 - SP_CHECK_LOG2); + Uint32 yp = ip & (1 << (5 - SP_CHECK_LOG2)) - 1; + Uint32& w = m_typeCheck[xp]; + Uint32 shift = 1 << SP_CHECK_LOG2; + Uint32 mask = (1 << shift) - 1; + // set + Uint32 bits = makeCheckBits(type); + w &= ~(mask << yp * shift); + w |= (bits << yp * shift); +} + +inline void* +SuperPool::getRecP(PtrI ptrI, RecInfo& ri) +{ + const Uint32 recMask = (1 << (32 - m_pageBits)) - 1; + PtrI pageI = ptrI & ~recMask; +#if NDB_SP_VERIFY_LEVEL >= 1 + Uint32 bits1 = getCheckBits(pageI); + Uint32 bits2 = makeCheckBits(ri.m_recType); + assert(bits1 == bits2); +#endif + void* pageP = getPageP(pageI); + Uint32 ir = ptrI & recMask; + void* recP = (Uint8*)pageP + ir * ri.m_recSize; + return recP; +} + +/* + * HeapPool - SuperPool on heap (concrete class) + * + * A super pool based on malloc with memory root on the heap. This + * pool type has 2 realistic uses: + * + * - a small pool with only initial malloc and pageBits set to match + * - the big pool from which all heap allocations are done + * + * A "smart" malloc may break "ip" limit by using different VM areas for + * different sized requests. For this reason malloc is done in units of + * increment size if possible. Memory root is set to start of first + * malloc. + */ + +class HeapPool : public SuperPool { +public: + // Describes malloc area. The areas are kept in singly linked list. + // There is a list head and pointers to current and last area. + struct Area { + Area(); + Area* m_nextArea; + PtrI m_firstPageI; + Uint32 m_currPage; + Uint32 m_numPages; + void* m_memory; + }; + + // Constructor. + HeapPool(Uint32 pageSize, Uint32 pageBits); + + // Initialize. + virtual bool init(); + + // Destructor. + virtual ~HeapPool(); + + // Use malloc to allocate more. + bool allocMoreData(size_t size); + + // Get new page from current area. + virtual PtrI getNewPage(); + + // List of malloc areas. + Area m_areaHead; + Area* m_currArea; + Area* m_lastArea; + + // Fraction of malloc size to try if cannot get all in one. + Uint32 m_mallocPart; +}; + +/* + * RecordPool - record pool using one super pool instance (template) + * + * Documented under SuperPool. Satisfies ArrayPool interface. + */ + +template <class T> +class RecordPool { +public: + // Constructor. + RecordPool(SuperPool& superPool); + + // Destructor. + ~RecordPool(); + + // Update pointer ptr.p according to i-value ptr.i. + void getPtr(Ptr<T>& ptr); + + // Allocate record from the pool. + bool seize(Ptr<T>& ptr); + + // Return record to the pool. + void release(Ptr<T>& ptr); + + // todo variants of basic methods + + // Return all pages to super pool. The force flag is required if + // there are any used records. + void free(bool force); + + SuperPool& m_superPool; + SuperPool::RecInfo m_recInfo; +}; + +template <class T> +inline +RecordPool<T>::RecordPool(SuperPool& superPool) : + m_superPool(superPool), + m_recInfo(1 + superPool.m_typeSeq++, sizeof(T)) +{ + SuperPool::RecInfo& ri = m_recInfo; + assert(sizeof(T) == SP_ALIGN_SIZE(sizeof(T), sizeof(Uint32))); + Uint32 maxUseCount = superPool.m_pageSize / sizeof(T); + Uint32 sizeLimit = 1 << (32 - superPool.m_pageBits); + if (maxUseCount >= sizeLimit) + maxUseCount = sizeLimit; + ri.m_maxUseCount = maxUseCount; +} + +template <class T> +inline +RecordPool<T>::~RecordPool() +{ + free(true); +} + +template <class T> +inline void +RecordPool<T>::getPtr(Ptr<T>& ptr) +{ + void* recP = m_superPool.getRecP(ptr.i, m_recInfo); + ptr.p = static_cast<T*>(recP); +} + +template <class T> +inline bool +RecordPool<T>::seize(Ptr<T>& ptr) +{ + SuperPool& sp = m_superPool; + SuperPool::RecInfo& ri = m_recInfo; + if (ri.m_currFreeRecI != RNIL || sp.getAvailPage(ri)) { + SuperPool::PtrI recI = ri.m_currFreeRecI; + void* recP = sp.getRecP(recI, ri); + ri.m_currFreeRecI = *(Uint32*)recP; + Uint32 useCount = ri.m_currUseCount; + assert(useCount < ri.m_maxUseCount); + ri.m_currUseCount = useCount + 1; + ri.m_totalUseCount++; + ptr.i = recI; + ptr.p = static_cast<T*>(recP); + return true; + } + return false; +} + +template <class T> +inline void +RecordPool<T>::release(Ptr<T>& ptr) +{ + SuperPool& sp = m_superPool; + SuperPool::RecInfo& ri = m_recInfo; + const Uint32 recMask = (1 << (32 - sp.m_pageBits)) - 1; + SuperPool::PtrI recI = ptr.i; + SuperPool::PtrI pageI = recI & ~recMask; + if (pageI != ri.m_currPageI) { + sp.setCurrPage(ri, pageI); + } + void* recP = sp.getRecP(recI, ri); + *(Uint32*)recP = ri.m_currFreeRecI; + ri.m_currFreeRecI = recI; + Uint32 useCount = ri.m_currUseCount; + assert(useCount != 0); + ri.m_currUseCount = useCount - 1; + ri.m_totalUseCount--; + ptr.i = RNIL; + ptr.p = 0; +} + +template <class T> +inline void +RecordPool<T>::free(bool force) +{ + SuperPool& sp = m_superPool; + SuperPool::RecInfo& ri = m_recInfo; + sp.setCurrPage(ri, RNIL); + assert(force || ri.m_totalUseCount == 0); + sp.movePages(sp.m_pageList, ri.m_freeList); + sp.movePages(sp.m_pageList, ri.m_activeList); + sp.movePages(sp.m_pageList, ri.m_fullList); + ri.m_totalRecCount = 0; +} + +#endif diff --git a/storage/ndb/src/kernel/vm/testSuperPool.cpp b/storage/ndb/src/kernel/vm/testSuperPool.cpp new file mode 100644 index 00000000000..194b3a43fa0 --- /dev/null +++ b/storage/ndb/src/kernel/vm/testSuperPool.cpp @@ -0,0 +1,220 @@ +#if 0 +make -f Makefile -f - testSuperPool <<'_eof_' +testSuperPool: testSuperPool.cpp libkernel.a + $(CXXCOMPILE) -o $@ $@.cpp libkernel.a -L../../common/util/.libs -lgeneral +_eof_ +exit $? +#endif + +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "SuperPool.hpp" +#include <NdbOut.hpp> + +template <Uint32 sz> +struct A { + Uint32 a[sz]; + void fill() { + Uint32 c = 0; + for (Uint32 i = 0; i + 1 < sz; i++) { + a[i] = random(); + c = (c << 1) ^ a[i]; + } + a[sz - 1] = c; + } + void check() { + Uint32 c = 0; + for (Uint32 i = 0; i + 1 < sz; i++) { + c = (c << 1) ^ a[i]; + } + assert(a[sz - 1] == c); + } +}; + +static Uint32 +urandom(Uint32 n) +{ + return (Uint32)random() % n; +} + +static Uint32 +random_coprime(Uint32 n) +{ + Uint32 prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 }; + Uint32 count = sizeof(prime) / sizeof(prime[0]); + while (1) { + Uint32 i = urandom(count); + if (n % prime[i] != 0) + return prime[i]; + } +} + +static int +cmpPtrI(const void* a, const void* b) +{ + Ptr<const void> u = *(Ptr<const void>*)a; + Ptr<const void> v = *(Ptr<const void>*)b; + return u.i < v.i ? -1 : u.i > v.i ? +1 : 0; +} + +static int +cmpPtrP(const void* a, const void* b) +{ + Ptr<const void> u = *(Ptr<const void>*)a; + Ptr<const void> v = *(Ptr<const void>*)b; + return u.p < v.p ? -1 : u.p > v.p ? +1 : 0; +} + +static Uint32 loopcount = 3; + +template <Uint32 sz> +void +sp_test(SuperPool& sp) +{ + typedef A<sz> T; + RecordPool<T> rp(sp); + SuperPool::RecInfo& ri = rp.m_recInfo; + Uint32 pageCount = sp.m_totalSize / sp.m_pageSize; + Uint32 perPage = rp.m_recInfo.m_maxUseCount; + Uint32 perPool = perPage * pageCount; + ndbout << "pages=" << pageCount << " perpage=" << perPage << " perpool=" << perPool << endl; + Ptr<T>* ptrList = new Ptr<T> [perPool]; + memset(ptrList, 0x1f, perPool * sizeof(Ptr<T>)); + Uint32 loop; + for (loop = 0; loop < loopcount; loop++) { + ndbout << "loop " << loop << endl; + Uint32 i, j; + // seize all + ndbout << "seize all" << endl; + for (i = 0; i < perPool + 1; i++) { + j = i; + sp.verify(ri); + Ptr<T> ptr1 = { 0, RNIL }; + if (! rp.seize(ptr1)) + break; + // write value + ptr1.p->fill(); + ptr1.p->check(); + // verify getPtr + Ptr<T> ptr2 = { 0, ptr1.i }; + rp.getPtr(ptr2); + assert(ptr1.i == ptr2.i && ptr1.p == ptr2.p); + // save + ptrList[j] = ptr1; + } + assert(i == perPool); + assert(ri.m_totalUseCount == perPool && ri.m_totalRecCount == perPool); + sp.verify(ri); + // check duplicates + { + Ptr<T>* ptrList2 = new Ptr<T> [perPool]; + memcpy(ptrList2, ptrList, perPool * sizeof(Ptr<T>)); + qsort(ptrList2, perPool, sizeof(Ptr<T>), cmpPtrI); + for (i = 1; i < perPool; i++) + assert(ptrList2[i - 1].i != ptrList2[i].i); + qsort(ptrList2, perPool, sizeof(Ptr<T>), cmpPtrP); + for (i = 1; i < perPool; i++) + assert(ptrList2[i - 1].p != ptrList2[i].p); + delete [] ptrList2; + } + // release all in various orders + ndbout << "release all" << endl; + Uint32 coprime = random_coprime(perPool); + for (i = 0; i < perPool; i++) { + sp.verify(ri); + switch (loop % 3) { + case 0: // ascending + j = i; + break; + case 1: // descending + j = perPool - 1 - i; + break; + case 2: // pseudo-random + j = (coprime * i) % perPool; + break; + } + Ptr<T>& ptr = ptrList[j]; + assert(ptr.i != RNIL && ptr.p != 0); + ptr.p->check(); + rp.release(ptr); + assert(ptr.i == RNIL && ptr.p == 0); + } + sp.setCurrPage(ri, RNIL); + assert(ri.m_totalUseCount == 0 && ri.m_totalRecCount == 0); + sp.verify(ri); + // seize/release at random + ndbout << "seize/release at random" << endl; + for (i = 0; i < loopcount * perPool; i++) { + j = urandom(perPool); + Ptr<T>& ptr = ptrList[j]; + if (ptr.i == RNIL) { + rp.seize(ptr); + ptr.p->fill(); + } else { + ptr.p->check(); + rp.release(ptr); + } + } + ndbout << "used " << ri.m_totalUseCount << endl; + sp.verify(ri); + // release all + ndbout << "release all" << endl; + for (i = 0; i < perPool; i++) { + j = i; + Ptr<T>& ptr = ptrList[j]; + if (ptr.i != RNIL) { + ptr.p->check(); + rp.release(ptr); + } + } + sp.setCurrPage(ri, RNIL); + assert(ri.m_totalUseCount == 0 && ri.m_totalRecCount == 0); + sp.verify(ri); + } + // done + delete [] ptrList; +} + +static Uint32 pageCount = 99; +static Uint32 pageSize = 32768; +static Uint32 pageBits = 15; + +const Uint32 sz1 = 3, sz2 = 4, sz3 = 53, sz4 = 424, sz5 = 5353; + +template void sp_test<sz1>(SuperPool& sp); +template void sp_test<sz2>(SuperPool& sp); +template void sp_test<sz3>(SuperPool& sp); +template void sp_test<sz4>(SuperPool& sp); +template void sp_test<sz5>(SuperPool& sp); + +int +main() +{ + HeapPool sp(pageSize, pageBits); + sp.setSizes(pageCount * pageSize); + if (! sp.init()) + assert(false); + Uint16 s = (Uint16)getpid(); + srandom(s); + ndbout << "rand " << s << endl; + sp_test<sz1>(sp); + sp_test<sz2>(sp); + sp_test<sz3>(sp); + sp_test<sz4>(sp); + sp_test<sz5>(sp); + return 0; +} |