summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorunknown <pekka@mysql.com>2004-10-17 13:12:25 +0200
committerunknown <pekka@mysql.com>2004-10-17 13:12:25 +0200
commitaa6fb64662bbdf34fbb9251ad7ceaf0c5eb46497 (patch)
treeab83f1231837a98c942404f76767cdf59dfbdfe8 /ndb
parentd49b8ac4bb1d652f0228c44f7d5ab5f8de9c2989 (diff)
downloadmariadb-git-aa6fb64662bbdf34fbb9251ad7ceaf0c5eb46497.tar.gz
NDB wl-1942 dbtux - move scans correctly at index tree re-org
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/kernel/blocks/dbtux/Dbtux.hpp15
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp368
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp8
-rw-r--r--ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp23
-rw-r--r--ndb/src/kernel/blocks/dbtux/Times.txt3
5 files changed, 265 insertions, 152 deletions
diff --git a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
index 780ac55ff4e..144fc3320b8 100644
--- a/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+++ b/ndb/src/kernel/blocks/dbtux/Dbtux.hpp
@@ -587,12 +587,19 @@ private:
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node);
// node operations
- void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
- void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
- void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
- void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
+ void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
+ void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos);
+ void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
+ void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos);
+ void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
+ void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos);
+ void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
+ void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node
+ void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
+ void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
+ void moveScanList(Signal* signal, NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
index b4a30c70f19..491c8a4c74f 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
@@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v
* A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Add list of scans at the new entry.
*/
void
-Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
+Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
- // fix scans
+ // fix old scans
+ if (node.getNodeScan() != RNIL)
+ nodePushUpScans(signal, node, pos);
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ for (unsigned i = occup; i > pos; i--) {
+ jam();
+ tmpList[i] = tmpList[i - 1];
+ }
+ tmpList[pos] = ent;
+ entList[0] = entList[occup + 1];
+ node.setOccup(occup + 1);
+ // add new scans
+ if (scanList != RNIL)
+ addScanList(node, pos, scanList);
+ // fix prefix
+ if (occup == 0 || pos == 0)
+ setNodePref(signal, node);
+}
+
+void
+Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
@@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- for (unsigned i = occup; i > pos; i--) {
- jam();
- tmpList[i] = tmpList[i - 1];
- }
- tmpList[pos] = ent;
- entList[0] = entList[occup + 1];
- node.setOccup(occup + 1);
- // fix prefix
- if (occup == 0 || pos == 0)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^
* A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Scans at removed entry are returned if non-zero location is passed or
+ * else moved forward.
*/
void
-Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // remove or move scans at this position
+ if (scanList == 0)
+ moveScanList(signal, node, pos);
+ else
+ removeScanList(node, pos, *scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopDownScans(signal, node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i < occup - 1; i++) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == pos) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At popDown pos=" << pos << " " << node << endl;
- }
-#endif
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i + 1];
}
- // fix other scans
+ entList[0] = entList[occup - 1];
+ node.setOccup(occup - 1);
+ // fix prefix
+ if (occup != 1 && pos == 0)
+ setNodePref(signal, node);
+}
+
+void
+Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
@@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- ent = tmpList[pos];
- for (unsigned i = pos; i < occup - 1; i++) {
- jam();
- tmpList[i] = tmpList[i + 1];
- }
- entList[0] = entList[occup - 1];
- node.setOccup(occup - 1);
- // fix prefix
- if (occup != 1 && pos == 0)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^
* A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Return list of scans at the removed position 0.
*/
void
-Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // remove scans at 0
+ removeScanList(node, 0, scanList);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePushDownScans(signal, node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt oldMin = tmpList[0];
+ for (unsigned i = 0; i < pos; i++) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == 0) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At pushDown pos=" << pos << " " << node << endl;
- }
-#endif
- // here we may miss a valid entry "X" XXX known bug
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i + 1];
}
- // fix other scans
+ tmpList[pos] = ent;
+ ent = oldMin;
+ entList[0] = entList[occup];
+ // fix prefix
+ if (true)
+ setNodePref(signal, node);
+}
+
+void
+Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
+ // handled before
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
@@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- TreeEnt oldMin = tmpList[0];
- for (unsigned i = 0; i < pos; i++) {
- jam();
- tmpList[i] = tmpList[i + 1];
- }
- tmpList[pos] = ent;
- ent = oldMin;
- entList[0] = entList[occup];
- // fix prefix
- if (true)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^
* A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
+ *
+ * Move scans at removed entry and add scans at the new entry.
*/
void
-Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
+Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
- ScanOpPtr scanPtr;
- // move scans whose entry disappears
- scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ if (node.getNodeScan() != RNIL) {
+ // move scans whose entry disappears
+ moveScanList(signal, node, pos);
+ // fix other scans
+ if (node.getNodeScan() != RNIL)
+ nodePopUpScans(signal, node, pos);
+ }
+ // fix node
+ TreeEnt* const entList = tree.getEntList(node.m_node);
+ entList[occup] = entList[0];
+ TreeEnt* const tmpList = entList + 1;
+ TreeEnt newMin = ent;
+ ent = tmpList[pos];
+ for (unsigned i = pos; i > 0; i--) {
jam();
- c_scanOpPool.getPtr(scanPtr);
- TreePos& scanPos = scanPtr.p->m_scanPos;
- ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
- const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
- if (scanPos.m_pos == pos) {
- jam();
-#ifdef VM_TRACE
- if (debugFlags & DebugScan) {
- debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
- debugOut << "At popUp pos=" << pos << " " << node << endl;
- }
-#endif
- // here we may miss a valid entry "X" XXX known bug
- scanNext(signal, scanPtr);
- }
- scanPtr.i = nextPtrI;
+ tmpList[i] = tmpList[i - 1];
}
- // fix other scans
+ tmpList[0] = newMin;
+ entList[0] = entList[occup];
+ // add scans
+ if (scanList != RNIL)
+ addScanList(node, 0, scanList);
+ // fix prefix
+ if (true)
+ setNodePref(signal, node);
+}
+
+void
+Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
+{
+ const unsigned occup = node.getOccup();
+ ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
- while (scanPtr.i != RNIL) {
+ do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
@@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++;
}
scanPtr.i = scanPtr.p->m_nodeScan;
- }
- // fix node
- TreeEnt* const entList = tree.getEntList(node.m_node);
- entList[occup] = entList[0];
- TreeEnt* const tmpList = entList + 1;
- TreeEnt newMin = ent;
- ent = tmpList[pos];
- for (unsigned i = pos; i > 0; i--) {
- jam();
- tmpList[i] = tmpList[i - 1];
- }
- tmpList[0] = newMin;
- entList[0] = entList[occup];
- // fix prefix
- if (true)
- setNodePref(signal, node);
+ } while (scanPtr.i != RNIL);
}
/*
@@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
ndbrequire(i <= 1);
while (cnt != 0) {
TreeEnt ent;
- nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
- nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
+ Uint32 scanList = RNIL;
+ nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
+ nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
}
}
+// scans linked to node
+
+
+/*
+ * Add list of scans to node at given position.
+ */
+void
+Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = scanList;
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "To pos=" << pos << " " << node << endl;
+ }
+#endif
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ scanPtr.p->m_nodeScan = RNIL;
+ linkScan(node, scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ // set position but leave direction alone
+ scanPos.m_loc = node.m_loc;
+ scanPos.m_pos = pos;
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Remove list of scans from node at given position. The return
+ * location must point to existing list (in fact RNIL always).
+ */
+void
+Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "Fron pos=" << pos << " " << node << endl;
+ }
+#endif
+ unlinkScan(node, scanPtr);
+ scanPtr.p->m_nodeScan = scanList;
+ scanList = scanPtr.i;
+ // unset position but leave direction alone
+ scanPos.m_loc = NullTupLoc;
+ scanPos.m_pos = ZNIL;
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
+/*
+ * Move list of scans away from entry about to be removed. Uses scan
+ * method scanNext().
+ */
+void
+Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
+{
+ ScanOpPtr scanPtr;
+ scanPtr.i = node.getNodeScan();
+ do {
+ jam();
+ c_scanOpPool.getPtr(scanPtr);
+ TreePos& scanPos = scanPtr.p->m_scanPos;
+ const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
+ ndbrequire(scanPos.m_loc == node.m_loc);
+ if (scanPos.m_pos == pos) {
+ jam();
+#ifdef VM_TRACE
+ if (debugFlags & DebugScan) {
+ debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
+ debugOut << "At pos=" << pos << " " << node << endl;
+ }
+#endif
+ scanNext(signal, scanPtr);
+ ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
+ }
+ scanPtr.i = nextPtrI;
+ } while (scanPtr.i != RNIL);
+}
+
/*
* Link scan to the list under the node. The list is single-linked and
* ordering does not matter.
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
index 585d2dbe58a..2235b0b80b6 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
@@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
/*
* Move to next entry. The scan is already linked to some node. When
- * we leave, if any entry was found, it will be linked to a possibly
+ * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
*
@@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
+ *
+ * If an entry was found, scan direction is 3. Therefore tree
+ * re-organizations need not worry about scan direction.
*/
void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
@@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (scan.m_state == ScanOp::Locked) {
jam();
// version of a tuple locked by us cannot disappear (assert only)
-#ifdef dbtux_wl_1942_is_done
ndbassert(false);
-#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
@@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
+ ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
diff --git a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
index 0451c50c248..392e571debe 100644
--- a/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
+++ b/ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
@@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
if (node.getOccup() < tree.m_maxOccup) {
// node has room
jam();
- nodePushUp(signal, node, pos, ent);
+ nodePushUp(signal, node, pos, ent, RNIL);
return;
}
treeAddFull(signal, frag, node, pos, ent);
@@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
}
jam();
insertNode(signal, node);
- nodePushUp(signal, node, 0, ent);
+ nodePushUp(signal, node, 0, ent, RNIL);
node.setSide(2);
tree.m_root = node.m_loc;
}
@@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room
jam();
+ Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
- nodePushDown(signal, lubNode, pos - 1, ent);
+ nodePushDown(signal, lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
- nodePushUp(signal, glbNode, glbNode.getOccup(), ent);
+ nodePushUp(signal, glbNode, glbNode.getOccup(), ent, scanList);
return;
}
treeAddNode(signal, frag, lubNode, pos, ent, glbNode, 1);
@@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
parentNode.setLink(i, glbNode.m_loc);
glbNode.setLink(2, parentNode.m_loc);
glbNode.setSide(i);
+ Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
- nodePushDown(signal, lubNode, pos - 1, ent);
+ nodePushDown(signal, lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
- nodePushUp(signal, glbNode, 0, ent);
+ nodePushUp(signal, glbNode, 0, ent, scanList);
// re-balance the tree
treeAddRebalance(signal, frag, parentNode, i);
}
@@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
if (node.getOccup() > tree.m_minOccup) {
// no underflow in any node type
jam();
- nodePopDown(signal, node, pos, ent);
+ nodePopDown(signal, node, pos, ent, 0);
return;
}
if (node.getChilds() == 2) {
@@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return;
}
// remove entry in semi/leaf
- nodePopDown(signal, node, pos, ent);
+ nodePopDown(signal, node, pos, ent, 0);
if (node.getLink(0) != NullTupLoc) {
jam();
treeRemoveSemi(signal, frag, node, 0);
@@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
// borrow max entry from semi/leaf
- nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent);
- nodePopUp(signal, lubNode, pos, ent);
+ Uint32 scanList = RNIL;
+ nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent, &scanList);
+ nodePopUp(signal, lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam();
treeRemoveSemi(signal, frag, glbNode, 0);
diff --git a/ndb/src/kernel/blocks/dbtux/Times.txt b/ndb/src/kernel/blocks/dbtux/Times.txt
index 5d45cfa15e5..bf466d12604 100644
--- a/ndb/src/kernel/blocks/dbtux/Times.txt
+++ b/ndb/src/kernel/blocks/dbtux/Times.txt
@@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
+wl-1942 mc02/a 35 ms 52 ms 49 pct
+ mc02/b 42 ms 75 ms 76 pct
+
vim: set et: