diff options
author | unknown <pekka@mysql.com> | 2004-10-17 13:12:25 +0200 |
---|---|---|
committer | unknown <pekka@mysql.com> | 2004-10-17 13:12:25 +0200 |
commit | aa6fb64662bbdf34fbb9251ad7ceaf0c5eb46497 (patch) | |
tree | ab83f1231837a98c942404f76767cdf59dfbdfe8 /ndb | |
parent | d49b8ac4bb1d652f0228c44f7d5ab5f8de9c2989 (diff) | |
download | mariadb-git-aa6fb64662bbdf34fbb9251ad7ceaf0c5eb46497.tar.gz |
NDB wl-1942 dbtux - move scans correctly at index tree re-org
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Dbtux.hpp | 15 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp | 368 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp | 8 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp | 23 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtux/Times.txt | 3 |
5 files changed, 265 insertions, 152 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp index 780ac55ff4e..144fc3320b8 100644 --- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp +++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp @@ -587,12 +587,19 @@ private: void deleteNode(Signal* signal, NodeHandle& node); void setNodePref(Signal* signal, NodeHandle& node); // node operations - void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); - void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); - void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); - void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); + void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList); + void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos); + void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList); + void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos); + void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList); + void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos); + void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList); + void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos); void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i); // scans linked to node + void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList); + void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList); + void moveScanList(Signal* signal, NodeHandle& node, unsigned pos); void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp index b4a30c70f19..491c8a4c74f 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp @@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) * v * A B C D E _ _ => A B C X D E _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Add list of scans at the new entry. */ void -Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) +Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup < tree.m_maxOccup && pos <= occup); - // fix scans + // fix old scans + if (node.getNodeScan() != RNIL) + nodePushUpScans(signal, node, pos); + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + for (unsigned i = occup; i > pos; i--) { + jam(); + tmpList[i] = tmpList[i - 1]; + } + tmpList[pos] = ent; + entList[0] = entList[occup + 1]; + node.setOccup(occup + 1); + // add new scans + if (scanList != RNIL) + addScanList(node, pos, scanList); + // fix prefix + if (occup == 0 || pos == 0) + setNodePref(signal, node); +} + +void +Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; @@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - for (unsigned i = occup; i > pos; i--) { - jam(); - tmpList[i] = tmpList[i - 1]; - } - tmpList[pos] = ent; - entList[0] = entList[occup + 1]; - node.setOccup(occup + 1); - // fix prefix - if (occup == 0 || pos == 0) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& * ^ ^ * A B C D E F _ => A B C E F _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Scans at removed entry are returned if non-zero location is passed or + * else moved forward. */ void -Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // remove or move scans at this position + if (scanList == 0) + moveScanList(signal, node, pos); + else + removeScanList(node, pos, *scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopDownScans(signal, node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + ent = tmpList[pos]; + for (unsigned i = pos; i < occup - 1; i++) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == pos) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At popDown pos=" << pos << " " << node << endl; - } -#endif - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i + 1]; } - // fix other scans + entList[0] = entList[occup - 1]; + node.setOccup(occup - 1); + // fix prefix + if (occup != 1 && pos == 0) + setNodePref(signal, node); +} + +void +Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before ndbrequire(scanPos.m_pos != pos); if (scanPos.m_pos > pos) { jam(); @@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - ent = tmpList[pos]; - for (unsigned i = pos; i < occup - 1; i++) { - jam(); - tmpList[i] = tmpList[i + 1]; - } - entList[0] = entList[occup - 1]; - node.setOccup(occup - 1); - // fix prefix - if (occup != 1 && pos == 0) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) * ^ v ^ * A B C D E _ _ => B C D X E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Return list of scans at the removed position 0. */ void -Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // remove scans at 0 + removeScanList(node, 0, scanList); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePushDownScans(signal, node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt oldMin = tmpList[0]; + for (unsigned i = 0; i < pos; i++) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == 0) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At pushDown pos=" << pos << " " << node << endl; - } -#endif - // here we may miss a valid entry "X" XXX known bug - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i + 1]; } - // fix other scans + tmpList[pos] = ent; + ent = oldMin; + entList[0] = entList[occup]; + // fix prefix + if (true) + setNodePref(signal, node); +} + +void +Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); + // handled before ndbrequire(scanPos.m_pos != 0); if (scanPos.m_pos <= pos) { jam(); @@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent scanPos.m_pos--; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - TreeEnt oldMin = tmpList[0]; - for (unsigned i = 0; i < pos; i++) { - jam(); - tmpList[i] = tmpList[i + 1]; - } - tmpList[pos] = ent; - ent = oldMin; - entList[0] = entList[occup]; - // fix prefix - if (true) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent * v ^ ^ * A B C D E _ _ => X A B C E _ _ * 0 1 2 3 4 5 6 0 1 2 3 4 5 6 + * + * Move scans at removed entry and add scans at the new entry. */ void -Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) +Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList) { Frag& frag = node.m_frag; TreeHead& tree = frag.m_tree; const unsigned occup = node.getOccup(); ndbrequire(occup <= tree.m_maxOccup && pos < occup); - ScanOpPtr scanPtr; - // move scans whose entry disappears - scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + if (node.getNodeScan() != RNIL) { + // move scans whose entry disappears + moveScanList(signal, node, pos); + // fix other scans + if (node.getNodeScan() != RNIL) + nodePopUpScans(signal, node, pos); + } + // fix node + TreeEnt* const entList = tree.getEntList(node.m_node); + entList[occup] = entList[0]; + TreeEnt* const tmpList = entList + 1; + TreeEnt newMin = ent; + ent = tmpList[pos]; + for (unsigned i = pos; i > 0; i--) { jam(); - c_scanOpPool.getPtr(scanPtr); - TreePos& scanPos = scanPtr.p->m_scanPos; - ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); - const Uint32 nextPtrI = scanPtr.p->m_nodeScan; - if (scanPos.m_pos == pos) { - jam(); -#ifdef VM_TRACE - if (debugFlags & DebugScan) { - debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; - debugOut << "At popUp pos=" << pos << " " << node << endl; - } -#endif - // here we may miss a valid entry "X" XXX known bug - scanNext(signal, scanPtr); - } - scanPtr.i = nextPtrI; + tmpList[i] = tmpList[i - 1]; } - // fix other scans + tmpList[0] = newMin; + entList[0] = entList[occup]; + // add scans + if (scanList != RNIL) + addScanList(node, 0, scanList); + // fix prefix + if (true) + setNodePref(signal, node); +} + +void +Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos) +{ + const unsigned occup = node.getOccup(); + ScanOpPtr scanPtr; scanPtr.i = node.getNodeScan(); - while (scanPtr.i != RNIL) { + do { jam(); c_scanOpPool.getPtr(scanPtr); TreePos& scanPos = scanPtr.p->m_scanPos; @@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) scanPos.m_pos++; } scanPtr.i = scanPtr.p->m_nodeScan; - } - // fix node - TreeEnt* const entList = tree.getEntList(node.m_node); - entList[occup] = entList[0]; - TreeEnt* const tmpList = entList + 1; - TreeEnt newMin = ent; - ent = tmpList[pos]; - for (unsigned i = pos; i > 0; i--) { - jam(); - tmpList[i] = tmpList[i - 1]; - } - tmpList[0] = newMin; - entList[0] = entList[occup]; - // fix prefix - if (true) - setNodePref(signal, node); + } while (scanPtr.i != RNIL); } /* @@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig ndbrequire(i <= 1); while (cnt != 0) { TreeEnt ent; - nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); - nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); + Uint32 scanList = RNIL; + nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList); + nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList); cnt--; } } +// scans linked to node + + +/* + * Add list of scans to node at given position. + */ +void +Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = scanList; + do { + jam(); + c_scanOpPool.getPtr(scanPtr); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "To pos=" << pos << " " << node << endl; + } +#endif + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + scanPtr.p->m_nodeScan = RNIL; + linkScan(node, scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + // set position but leave direction alone + scanPos.m_loc = node.m_loc; + scanPos.m_pos = pos; + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Remove list of scans from node at given position. The return + * location must point to existing list (in fact RNIL always). + */ +void +Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + TreePos& scanPos = scanPtr.p->m_scanPos; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "Fron pos=" << pos << " " << node << endl; + } +#endif + unlinkScan(node, scanPtr); + scanPtr.p->m_nodeScan = scanList; + scanList = scanPtr.i; + // unset position but leave direction alone + scanPos.m_loc = NullTupLoc; + scanPos.m_pos = ZNIL; + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + +/* + * Move list of scans away from entry about to be removed. Uses scan + * method scanNext(). + */ +void +Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos) +{ + ScanOpPtr scanPtr; + scanPtr.i = node.getNodeScan(); + do { + jam(); + c_scanOpPool.getPtr(scanPtr); + TreePos& scanPos = scanPtr.p->m_scanPos; + const Uint32 nextPtrI = scanPtr.p->m_nodeScan; + ndbrequire(scanPos.m_loc == node.m_loc); + if (scanPos.m_pos == pos) { + jam(); +#ifdef VM_TRACE + if (debugFlags & DebugScan) { + debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl; + debugOut << "At pos=" << pos << " " << node << endl; + } +#endif + scanNext(signal, scanPtr); + ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); + } + scanPtr.i = nextPtrI; + } while (scanPtr.i != RNIL); +} + /* * Link scan to the list under the node. The list is single-linked and * ordering does not matter. diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp index 585d2dbe58a..2235b0b80b6 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp @@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) /* * Move to next entry. The scan is already linked to some node. When - * we leave, if any entry was found, it will be linked to a possibly + * we leave, if an entry was found, it will be linked to a possibly * different node. The scan has a position, and a direction which tells * from where we came to this position. This is one of: * @@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) * 2 - up from root (the scan ends) * 3 - left to right within node (at end proceed to right child) * 4 - down from parent (proceed to left child) + * + * If an entry was found, scan direction is 3. Therefore tree + * re-organizations need not worry about scan direction. */ void Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) @@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) if (scan.m_state == ScanOp::Locked) { jam(); // version of a tuple locked by us cannot disappear (assert only) -#ifdef dbtux_wl_1942_is_done ndbassert(false); -#endif AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); lockReq->returnCode = RNIL; lockReq->requestInfo = AccLockReq::Unlock; @@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) scan.m_scanPos = pos; // relink if (scan.m_state == ScanOp::Current) { + ndbrequire(pos.m_match == true && pos.m_dir == 3); ndbrequire(pos.m_loc == node.m_loc); if (origNode.m_loc != node.m_loc) { jam(); diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp index 0451c50c248..392e571debe 100644 --- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp +++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp @@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) if (node.getOccup() < tree.m_maxOccup) { // node has room jam(); - nodePushUp(signal, node, pos, ent); + nodePushUp(signal, node, pos, ent, RNIL); return; } treeAddFull(signal, frag, node, pos, ent); @@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) } jam(); insertNode(signal, node); - nodePushUp(signal, node, 0, ent); + nodePushUp(signal, node, 0, ent, RNIL); node.setSide(2); tree.m_root = node.m_loc; } @@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, if (glbNode.getOccup() < tree.m_maxOccup) { // g.l.b node has room jam(); + Uint32 scanList = RNIL; if (pos != 0) { jam(); // add the new entry and return min entry - nodePushDown(signal, lubNode, pos - 1, ent); + nodePushDown(signal, lubNode, pos - 1, ent, scanList); } // g.l.b node receives min entry from l.u.b node - nodePushUp(signal, glbNode, glbNode.getOccup(), ent); + nodePushUp(signal, glbNode, glbNode.getOccup(), ent, scanList); return; } treeAddNode(signal, frag, lubNode, pos, ent, glbNode, 1); @@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, parentNode.setLink(i, glbNode.m_loc); glbNode.setLink(2, parentNode.m_loc); glbNode.setSide(i); + Uint32 scanList = RNIL; if (pos != 0) { jam(); // add the new entry and return min entry - nodePushDown(signal, lubNode, pos - 1, ent); + nodePushDown(signal, lubNode, pos - 1, ent, scanList); } // g.l.b node receives min entry from l.u.b node - nodePushUp(signal, glbNode, 0, ent); + nodePushUp(signal, glbNode, 0, ent, scanList); // re-balance the tree treeAddRebalance(signal, frag, parentNode, i); } @@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) if (node.getOccup() > tree.m_minOccup) { // no underflow in any node type jam(); - nodePopDown(signal, node, pos, ent); + nodePopDown(signal, node, pos, ent, 0); return; } if (node.getChilds() == 2) { @@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) return; } // remove entry in semi/leaf - nodePopDown(signal, node, pos, ent); + nodePopDown(signal, node, pos, ent, 0); if (node.getLink(0) != NullTupLoc) { jam(); treeRemoveSemi(signal, frag, node, 0); @@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned loc = glbNode.getLink(1); } while (loc != NullTupLoc); // borrow max entry from semi/leaf - nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent); - nodePopUp(signal, lubNode, pos, ent); + Uint32 scanList = RNIL; + nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent, &scanList); + nodePopUp(signal, lubNode, pos, ent, scanList); if (glbNode.getLink(0) != NullTupLoc) { jam(); treeRemoveSemi(signal, frag, glbNode, 0); diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt index 5d45cfa15e5..bf466d12604 100644 --- a/ndb/src/kernel/blocks/dbtux/Times.txt +++ b/ndb/src/kernel/blocks/dbtux/Times.txt @@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct [ allow slack (2) in interior nodes - almost no effect?? ] +wl-1942 mc02/a 35 ms 52 ms 49 pct + mc02/b 42 ms 75 ms 76 pct + vim: set et: |