diff options
Diffstat (limited to 'ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp')
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp | 633 |
1 files changed, 633 insertions, 0 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp new file mode 100644 index 00000000000..6733a87da97 --- /dev/null +++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -0,0 +1,633 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#define DBTUX_NODE_CPP +#include "Dbtux.hpp" + +/* + * Node handles. + * + * We use the "cache" implementation. Node operations are done on + * cached copies. Index memory is updated at the end of the operation. + * At most one node is inserted and it is always pre-allocated. + * + * An alternative "pointer" implementation which writes directly into + * index memory is planned for later. + */ + +// Dbtux + +void +Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) +{ + if (! c_nodeHandlePool.seize(nodePtr)) { + jam(); + return; + } + new (nodePtr.p) NodeHandle(*this, frag); + nodePtr.p->m_next = frag.m_nodeList; + frag.m_nodeList = nodePtr.i; + // node cache used always + nodePtr.p->m_node = (TreeNode*)nodePtr.p->m_cache; + new (nodePtr.p->m_node) TreeNode(); +#ifdef VM_TRACE + TreeHead& tree = frag.m_tree; + TreeNode* node = nodePtr.p->m_node; + memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2); + memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2); + TreeEnt* entList = tree.getEntList(node); + memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2)); +#endif +} + +void +Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode) +{ + ndbrequire(frag.m_nodeFree == RNIL); + NodeHandlePtr nodePtr; + seizeNode(signal, frag, nodePtr); + ndbrequire(nodePtr.i != RNIL); + // remove from cache XXX ugly + frag.m_nodeFree = frag.m_nodeList; + frag.m_nodeList = nodePtr.p->m_next; + StorePar storePar; + storePar.m_opCode = TupStoreTh::OpInsert; + storePar.m_offset = 0; + storePar.m_size = 0; + tupStoreTh(signal, frag, nodePtr, storePar); + if (storePar.m_errorCode != 0) { + jam(); + errorCode = storePar.m_errorCode; + c_nodeHandlePool.release(nodePtr); + frag.m_nodeFree = RNIL; + } +} + +/* + * Find node in the cache. XXX too slow, use direct links instead + */ +void +Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr) +{ + NodeHandlePtr tmpPtr; + tmpPtr.i = frag.m_nodeList; + while (tmpPtr.i != RNIL) { + jam(); + c_nodeHandlePool.getPtr(tmpPtr); + if (tmpPtr.p->m_addr == addr) { + jam(); + nodePtr = tmpPtr; + return; + } + tmpPtr.i = tmpPtr.p->m_next; + } + nodePtr.i = RNIL; + nodePtr.p = 0; +} + +/* + * Get handle for existing node. + */ +void +Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc) +{ + ndbrequire(addr != NullTupAddr && acc > AccNone); + NodeHandlePtr tmpPtr; + // search in cache + findNode(signal, frag, tmpPtr, addr); + if (tmpPtr.i == RNIL) { + jam(); + // add new node + seizeNode(signal, frag, tmpPtr); + ndbrequire(tmpPtr.i != RNIL); + tmpPtr.p->m_addr = addr; + } + if (tmpPtr.p->m_acc < acc) { + jam(); + accessNode(signal, frag, tmpPtr, acc); + } + nodePtr = tmpPtr; +} + +/* + * Create new node in the cache and mark it for insert. + */ +void +Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) +{ + ndbrequire(acc > AccNone); + NodeHandlePtr tmpPtr; + // use the pre-allocated node + tmpPtr.i = frag.m_nodeFree; + frag.m_nodeFree = RNIL; + c_nodeHandlePool.getPtr(tmpPtr); + // move it to the cache + tmpPtr.p->m_next = frag.m_nodeList; + frag.m_nodeList = tmpPtr.i; + tmpPtr.p->m_acc = acc; + tmpPtr.p->m_flags |= NodeHandle::DoInsert; + nodePtr = tmpPtr; +} + +/* + * Mark existing node for deletion. + */ +void +Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) +{ + NodeHandlePtr tmpPtr = nodePtr; + ndbrequire(tmpPtr.p->getOccup() == 0); + tmpPtr.p->m_flags |= NodeHandle::DoDelete; + // scans have already been moved by popDown or popUp +} + +/* + * Access more of the node. + */ +void +Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) +{ + TreeHead& tree = frag.m_tree; + NodeHandlePtr tmpPtr = nodePtr; + if (tmpPtr.p->m_acc >= acc) + return; + if (! (tmpPtr.p->m_flags & NodeHandle::DoInsert)) { + jam(); + StorePar storePar; + storePar.m_opCode = TupStoreTh::OpRead; + storePar.m_offset = tree.getSize(tmpPtr.p->m_acc); + storePar.m_size = tree.getSize(acc) - tree.getSize(tmpPtr.p->m_acc); + tmpPtr.p->m_tux.tupStoreTh(signal, frag, tmpPtr, storePar); + ndbrequire(storePar.m_errorCode == 0); + } + tmpPtr.p->m_acc = acc; +} + +/* + * Set prefix. + */ +void +Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i) +{ + TreeHead& tree = frag.m_tree; + NodeHandlePtr tmpPtr = nodePtr; + ReadPar readPar; + ndbrequire(i <= 1); + readPar.m_ent = tmpPtr.p->getMinMax(i); + readPar.m_first = 0; + readPar.m_count = frag.m_numAttrs; + // leave in signal data + readPar.m_data = 0; + // XXX implement max words to read + tupReadAttrs(signal, frag, readPar); + // copy whatever fits + CopyPar copyPar; + copyPar.m_items = readPar.m_count; + copyPar.m_headers = true; + copyPar.m_maxwords = tree.m_prefSize; + Data pref = tmpPtr.p->getPref(i); + copyAttrs(pref, readPar.m_data, copyPar); + nodePtr.p->m_flags |= NodeHandle::DoUpdate; +} + +/* + * Commit and release nodes at the end of an operation. Used also on + * error since no changes have been made (updateOk false). + */ +void +Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk) +{ + TreeHead& tree = frag.m_tree; + NodeHandlePtr nodePtr; + nodePtr.i = frag.m_nodeList; + frag.m_nodeList = RNIL; + while (nodePtr.i != RNIL) { + c_nodeHandlePool.getPtr(nodePtr); + const unsigned flags = nodePtr.p->m_flags; + if (flags & NodeHandle::DoDelete) { + jam(); + ndbrequire(updateOk); + // delete + StorePar storePar; + storePar.m_opCode = TupStoreTh::OpDelete; + nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar); + ndbrequire(storePar.m_errorCode == 0); + } else if (flags & NodeHandle::DoUpdate) { + jam(); + ndbrequire(updateOk); + // set prefixes + if (flags & (1 << 0)) { + jam(); + setNodePref(signal, frag, nodePtr, 0); + } + if (flags & (1 << 1)) { + jam(); + setNodePref(signal, frag, nodePtr, 1); + } + // update + StorePar storePar; + storePar.m_opCode = TupStoreTh::OpUpdate; + storePar.m_offset = 0; + storePar.m_size = tree.getSize(nodePtr.p->m_acc); + nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar); + ndbrequire(storePar.m_errorCode == 0); + } + // release + NodeHandlePtr tmpPtr = nodePtr; + nodePtr.i = nodePtr.p->m_next; + c_nodeHandlePool.release(tmpPtr); + } +} + +// Dbtux::NodeHandle + +/* + * Add entry at position. Move entries greater than or equal to the old + * one (if any) to the right. + * + * X + * v + * A B C D E _ _ => A B C X D E _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + */ +void +Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent) +{ + TreeHead& tree = m_frag.m_tree; + const unsigned occup = getOccup(); + ndbrequire(occup < tree.m_maxOccup && pos <= occup); + // fix scans + ScanOpPtr scanPtr; + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + if (scanPos.m_pos >= pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At pushUp pos=" << pos << " " << *this << endl; + } +#endif + scanPos.m_pos++; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } + // fix node + TreeEnt* const entList = tree.getEntList(m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + for (unsigned i = occup; i > pos; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[pos] = ent; + if (occup == 0 || pos == 0) + m_flags |= (1 << 0); + if (occup == 0 || pos == occup) + m_flags |= (1 << 1); + entList[0] = entList[occup + 1]; + setOccup(occup + 1); + m_flags |= DoUpdate; +} + +/* + * Remove and return entry at position. Move entries greater than the + * removed one to the left. This is the opposite of pushUp. + * + * D + * ^ ^ + * A B C D E F _ => A B C E F _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + */ +void +Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent) +{ + TreeHead& tree = m_frag.m_tree; + const unsigned occup = getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + ScanOpPtr scanPtr; + // move scans whose entry disappears + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl; + } +#endif + m_tux.scanNext(signal, scanPtr); + } + scanPtr.i = nextPtrI; + } + // fix other scans + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_pos != pos); + if (scanPos.m_pos > pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl; + } +#endif + scanPos.m_pos--; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } + // fix node + TreeEnt* const entList = tree.getEntList(m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + ent = tmpList[pos]; + for (unsigned i = pos; i < occup - 1; i++) { + jam(); + tmpList[i] = tmpList[i + 1]; + } + if (occup != 1 && pos == 0) + m_flags |= (1 << 0); + if (occup != 1 && pos == occup - 1) + m_flags |= (1 << 1); + entList[0] = entList[occup - 1]; + setOccup(occup - 1); + m_flags |= DoUpdate; +} + +/* + * Add entry at existing position. Move entries less than or equal to + * the old one to the left. Remove and return old min entry. + * + * X A + * ^ v ^ + * A B C D E _ _ => B C D X E _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + */ +void +Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent) +{ + TreeHead& tree = m_frag.m_tree; + const unsigned occup = getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + ScanOpPtr scanPtr; + // move scans whose entry disappears + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + if (scanPos.m_pos == 0) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl; + } +#endif + // here we may miss a valid entry "X" XXX known bug + m_tux.scanNext(signal, scanPtr); + } + scanPtr.i = nextPtrI; + } + // fix other scans + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_pos != 0); + if (scanPos.m_pos <= pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl; + } +#endif + scanPos.m_pos--; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } + // fix node + TreeEnt* const entList = tree.getEntList(m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt oldMin = tmpList[0]; + for (unsigned i = 0; i < pos; i++) { + jam(); + tmpList[i] = tmpList[i + 1]; + } + tmpList[pos] = ent; + ent = oldMin; + if (true) + m_flags |= (1 << 0); + if (occup == 1 || pos == occup - 1) + m_flags |= (1 << 1); + entList[0] = entList[occup]; + m_flags |= DoUpdate; +} + +/* + * Remove and return entry at position. Move entries less than the + * removed one to the right. Replace min entry by the input entry. + * This is the opposite of pushDown. + * + * X D + * v ^ ^ + * A B C D E _ _ => X A B C E _ _ + * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + */ +void +Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent) +{ + TreeHead& tree = m_frag.m_tree; + const unsigned occup = getOccup(); + ndbrequire(occup <= tree.m_maxOccup && pos < occup); + ScanOpPtr scanPtr; + // move scans whose entry disappears + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl; + } +#endif + // here we may miss a valid entry "X" XXX known bug + m_tux.scanNext(signal, scanPtr); + } + scanPtr.i = nextPtrI; + } + // fix other scans + scanPtr.i = getNodeScan(); + while (scanPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup); + ndbrequire(scanPos.m_pos != pos); + if (scanPos.m_pos < pos) { + jam(); +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl; + } +#endif + scanPos.m_pos++; + } + scanPtr.i = scanPtr.p->m_nodeScan; + } + // fix node + TreeEnt* const entList = tree.getEntList(m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt newMin = ent; + ent = tmpList[pos]; + for (unsigned i = pos; i > 0; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[0] = newMin; + if (true) + m_flags |= (1 << 0); + if (occup == 1 || pos == occup - 1) + m_flags |= (1 << 1); + entList[0] = entList[occup]; + m_flags |= DoUpdate; +} + +/* + * Move all possible entries from another node before the min (i=0) or + * after the max (i=1). XXX can be optimized + */ +void +Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i) +{ + ndbrequire(i <= 1); + TreeHead& tree = m_frag.m_tree; + while (getOccup() < tree.m_maxOccup && nodePtr.p->getOccup() != 0) { + TreeEnt ent; + nodePtr.p->popDown(signal, i == 0 ? nodePtr.p->getOccup() - 1 : 0, ent); + pushUp(signal, i == 0 ? 0 : getOccup(), ent); + } +} + +/* + * Link scan to the list under the node. The list is single-linked and + * ordering does not matter. + */ +void +Dbtux::NodeHandle::linkScan(Dbtux::ScanOpPtr scanPtr) +{ +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "To node " << *this << endl; + } +#endif + ndbrequire(! islinkScan(scanPtr) && scanPtr.p->m_nodeScan == RNIL); + scanPtr.p->m_nodeScan = getNodeScan(); + setNodeScan(scanPtr.i); +} + +/* + * Unlink a scan from the list under the node. + */ +void +Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr) +{ +#ifdef VM_TRACE + if (m_tux.debugFlags & m_tux.DebugScan) { + m_tux.debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl; + m_tux.debugOut << "From node " << *this << endl; + } +#endif + Dbtux::ScanOpPtr currPtr; + currPtr.i = getNodeScan(); + Dbtux::ScanOpPtr prevPtr; + prevPtr.i = RNIL; + while (true) { + jam(); + m_tux.c_scanOpPool.getPtr(currPtr); + Uint32 nextPtrI = currPtr.p->m_nodeScan; + if (currPtr.i == scanPtr.i) { + jam(); + if (prevPtr.i == RNIL) { + setNodeScan(nextPtrI); + } else { + jam(); + prevPtr.p->m_nodeScan = nextPtrI; + } + scanPtr.p->m_nodeScan = RNIL; + // check for duplicates + ndbrequire(! islinkScan(scanPtr)); + return; + } + prevPtr = currPtr; + currPtr.i = nextPtrI; + } +} + +/* + * Check if a scan is linked to this node. Only for ndbrequire. + */ +bool +Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr) +{ + Dbtux::ScanOpPtr currPtr; + currPtr.i = getNodeScan(); + while (currPtr.i != RNIL) { + jam(); + m_tux.c_scanOpPool.getPtr(currPtr); + if (currPtr.i == scanPtr.i) { + jam(); + return true; + } + currPtr.i = currPtr.p->m_nodeScan; + } + return false; +} + +void +Dbtux::NodeHandle::progError(int line, int cause, const char* extra) +{ + m_tux.progError(line, cause, extra); +} |