diff options
Diffstat (limited to 'storage/ndb/src/kernel/blocks/grep/Grep.cpp')
-rw-r--r-- | storage/ndb/src/kernel/blocks/grep/Grep.cpp | 2010 |
1 files changed, 2010 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/blocks/grep/Grep.cpp b/storage/ndb/src/kernel/blocks/grep/Grep.cpp new file mode 100644 index 00000000000..0527c5415ab --- /dev/null +++ b/storage/ndb/src/kernel/blocks/grep/Grep.cpp @@ -0,0 +1,2010 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "Grep.hpp" +#include <ndb_version.h> + +#include <NdbTCP.h> +#include <Bitmask.hpp> + +#include <signaldata/NodeFailRep.hpp> +#include <signaldata/ReadNodesConf.hpp> +#include <signaldata/CheckNodeGroups.hpp> +#include <signaldata/GrepImpl.hpp> +#include <signaldata/RepImpl.hpp> +#include <signaldata/EventReport.hpp> +#include <signaldata/DictTabInfo.hpp> +#include <signaldata/GetTabInfo.hpp> +#include <signaldata/WaitGCP.hpp> +#include <GrepEvent.hpp> +#include <AttributeHeader.hpp> + +#define CONTINUEB_DELAY 500 +#define SSREPBLOCKNO 2 +#define PSREPBLOCKNO 2 + +//#define DEBUG_GREP +//#define DEBUG_GREP_SUBSCRIPTION +//#define DEBUG_GREP_TRANSFER +//#define DEBUG_GREP_APPLY +//#define DEBUG_GREP_DELETE + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: STARTUP of GREP Block, etc + * ------------------------------------------------------------------------ + **************************************************************************/ +static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE; +void +Grep::getNodeGroupMembers(Signal* signal) { + jam(); + /** + * Ask DIH for nodeGroupMembers + */ + CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend(); + sd->blockRef = reference(); + sd->requestType = + CheckNodeGroups::Direct | + CheckNodeGroups::GetNodeGroupMembers; + sd->nodeId = getOwnNodeId(); + EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, + CheckNodeGroups::SignalLength); + jamEntry(); + + c_nodeGroup = sd->output; + c_noNodesInGroup = 0; + for (int i = 0; i < MAX_NDB_NODES; i++) { + if (sd->mask.get(i)) { + if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup; + c_nodesInGroup[c_noNodesInGroup] = i; + c_noNodesInGroup++; + } + } + ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup + +#ifdef NODEFAIL_DEBUG + for (Uint32 i = 0; i < c_noNodesInGroup; i++) { + ndbout_c ("Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u", + c_nodeGroup, getOwnNodeId(), c_idInNodeGroup, + i, c_nodesInGroup[i]); + } +#endif +} + + +void +Grep::execSTTOR(Signal* signal) +{ + jamEntry(); + const Uint32 startphase = signal->theData[1]; + const Uint32 typeOfStart = signal->theData[7]; + if (startphase == 3) + { + jam(); + signal->theData[0] = reference(); + g_TypeOfStart = typeOfStart; + sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB); + return; + } + if(startphase == 5) { + jam(); + /** + * we don't want any log/meta records comming to use + * until we are done with the recovery. + */ + if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { + jam(); + pspart.m_recoveryMode = true; + getNodeGroupMembers(signal); + for (Uint32 i = 0; i < c_noNodesInGroup; i++) { + Uint32 ref =numberToRef(GREP, c_nodesInGroup[i]); + if (ref != reference()) + sendSignal(ref, GSN_GREP_START_ME, signal, + 1 /*SumaStartMe::SignalLength*/, JBB); + } + } else pspart.m_recoveryMode = false; + + } + + if(startphase == 7) { + jam(); + if (g_TypeOfStart == NodeState::ST_NODE_RESTART) { + pspart.m_recoveryMode = false; + } + } + + sendSTTORRY(signal); +} + + +void +Grep::PSPart::execSTART_ME(Signal* signal) +{ + jamEntry(); + GrepStartMe * me =(GrepStartMe*)signal->getDataPtr(); + BlockReference ref = me->senderRef; + GrepAddSubReq* const addReq = (GrepAddSubReq *)signal->getDataPtr(); + + + SubscriptionPtr subPtr; + c_subscriptions.first(c_subPtr); + for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) { + jam(); + subPtr.i = c_subPtr.curr.i; + subPtr.p = c_subscriptions.getPtr(subPtr.i); + addReq->subscriptionId = subPtr.p->m_subscriptionId; + addReq->subscriptionKey = subPtr.p->m_subscriptionKey; + addReq->subscriberData = subPtr.p->m_subscriberData; + addReq->subscriptionType = subPtr.p->m_subscriptionType; + addReq->senderRef = subPtr.p->m_coordinatorRef; + addReq->subscriberRef =subPtr.p->m_subscriberRef; + + sendSignal(ref, + GSN_GREP_ADD_SUB_REQ, + signal, + GrepAddSubReq::SignalLength, + JBB); + } + + addReq->subscriptionId = 0; + addReq->subscriptionKey = 0; + addReq->subscriberData = 0; + addReq->subscriptionType = 0; + addReq->senderRef = 0; + addReq->subscriberRef = 0; + + sendSignal(ref, + GSN_GREP_ADD_SUB_REQ, + signal, + GrepAddSubReq::SignalLength, + JBB); +} + +void +Grep::PSPart::execGREP_ADD_SUB_REQ(Signal* signal) +{ + jamEntry(); + GrepAddSubReq * const grepReq = (GrepAddSubReq *)signal->getDataPtr(); + const Uint32 subId = grepReq->subscriptionId; + const Uint32 subKey = grepReq->subscriptionKey; + const Uint32 subData = grepReq->subscriberData; + const Uint32 subType = grepReq->subscriptionType; + const Uint32 coordinatorRef = grepReq->senderRef; + + /** + * this is ref to the REP node for this subscription. + */ + const Uint32 subRef = grepReq->subscriberRef; + + if(subId!=0 && subKey!=0) { + jam(); + SubscriptionPtr subPtr; + ndbrequire( c_subscriptionPool.seize(subPtr)); + subPtr.p->m_coordinatorRef = coordinatorRef; + subPtr.p->m_subscriptionId = subId; + subPtr.p->m_subscriptionKey = subKey; + subPtr.p->m_subscriberRef = subRef; + subPtr.p->m_subscriberData = subData; + subPtr.p->m_subscriptionType = subType; + + c_subscriptions.add(subPtr); + } + else { + jam(); + GrepAddSubConf * conf = (GrepAddSubConf *)grepReq; + conf->noOfSub = + c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree(); + sendSignal(signal->getSendersBlockRef(), + GSN_GREP_ADD_SUB_CONF, + signal, + GrepAddSubConf::SignalLength, + JBB); + } +} + +void +Grep::PSPart::execGREP_ADD_SUB_REF(Signal* signal) +{ + /** + * @todo fix error stuff + */ +} + +void +Grep::PSPart::execGREP_ADD_SUB_CONF(Signal* signal) +{ + jamEntry(); + GrepAddSubConf* const conf = (GrepAddSubConf *)signal->getDataPtr(); + Uint32 noOfSubscriptions = conf->noOfSub; + Uint32 noOfRestoredSubscriptions = + c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree(); + if(noOfSubscriptions!=noOfRestoredSubscriptions) { + jam(); + /** + *@todo send ref signal + */ + ndbrequire(false); + } +} + +void +Grep::execREAD_NODESCONF(Signal* signal) +{ + jamEntry(); + ReadNodesConf * conf = (ReadNodesConf *)signal->getDataPtr(); + +#if 0 + ndbout_c("Grep: Recd READ_NODESCONF"); +#endif + + /****************************** + * Check which REP nodes exist + ******************************/ + Uint32 i; + for (i = 1; i < MAX_NODES; i++) + { + jam(); +#if 0 + ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType()); +#endif + if (getNodeInfo(i).getType() == NodeInfo::REP) + { + jam(); + /** + * @todo This should work for more than ONE rep node! + */ + pscoord.m_repRef = numberToRef(PSREPBLOCKNO, i); + pspart.m_repRef = numberToRef(PSREPBLOCKNO, i); +#if 0 + ndbout_c("Grep: REP node %d detected", i); +#endif + } + } + + /***************************** + * Check which DB nodes exist + *****************************/ + m_aliveNodes.clear(); + + Uint32 count = 0; + for(i = 0; i<MAX_NDB_NODES; i++) + { + if (NodeBitmask::get(conf->allNodes, i)) + { + jam(); + count++; + + NodePtr node; + ndbrequire(m_nodes.seize(node)); + + node.p->nodeId = i; + if (NodeBitmask::get(conf->inactiveNodes, i)) + { + node.p->alive = 0; + } + else + { + node.p->alive = 1; + m_aliveNodes.set(i); + } + } + } + m_masterNodeId = conf->masterNodeId; + ndbrequire(count == conf->noOfNodes); + sendSTTORRY(signal); +} + +void +Grep::sendSTTORRY(Signal* signal) +{ + signal->theData[0] = 0; + signal->theData[3] = 1; + signal->theData[4] = 3; + signal->theData[5] = 5; + signal->theData[6] = 7; + signal->theData[7] = 255; // No more start phases from missra + sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB); +} + +void +Grep::execNDB_STTOR(Signal* signal) +{ + jamEntry(); +} + +void +Grep::execDUMP_STATE_ORD(Signal* signal) +{ + jamEntry(); + //Uint32 tCase = signal->theData[0]; + +#if 0 + if(sscoord.m_repRef == 0) + { + ndbout << "Grep: Recd DUMP signal but has no connection with REP node" + << endl; + return; + } +#endif + + /* + switch (tCase) + { + case 8100: sscoord.grepReq(signal, GrepReq::START_SUBSCR); break; + case 8102: sscoord.grepReq(signal, GrepReq::START_METALOG); break; + case 8104: sscoord.grepReq(signal, GrepReq::START_METASCAN); break; + case 8106: sscoord.grepReq(signal, GrepReq::START_DATALOG); break; + case 8108: sscoord.grepReq(signal, GrepReq::START_DATASCAN); break; + case 8110: sscoord.grepReq(signal, GrepReq::STOP_SUBSCR); break; + case 8500: sscoord.grepReq(signal, GrepReq::REMOVE_BUFFERS); break; + case 8300: sscoord.grepReq(signal, GrepReq::SLOWSTOP); break; + case 8400: sscoord.grepReq(signal, GrepReq::FASTSTOP); break; + case 8600: sscoord.grepReq(signal, GrepReq::CREATE_SUBSCR); break; + case 8700: sscoord.dropTable(signal,(Uint32)signal->theData[1]);break; + default: break; + } + */ +} + +/** + * Signal received when REP node has failed + */ +void +Grep::execAPI_FAILREQ(Signal* signal) +{ + jamEntry(); + //Uint32 failedApiNode = signal->theData[0]; + //BlockReference retRef = signal->theData[1]; + + /** + * @todo We should probably do something smart if the + * PS REP node fails???? /Lars + */ + +#if 0 + ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode); +#endif + + /** + * @note This signal received is NOT allowed to send any CONF + * signal, since this would screw up TC/DICT to API + * "connections". + */ +} + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: GREP Control + * ------------------------------------------------------------------------ + **************************************************************************/ +void +Grep::execGREP_REQ(Signal* signal) +{ + jamEntry(); + + //GrepReq * req = (GrepReq *)signal->getDataPtr(); + + /** + * @todo Fix so that request is redirected to REP Server + * Obsolete? + * Was: sscoord.grepReq(signal, req->request); + */ + ndbout_c("Warning! REP commands can only be executed at REP SERVER prompt!"); +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: NODE STATE HANDLING + * ------------------------------------------------------------------------ + **************************************************************************/ +void +Grep::execNODE_FAILREP(Signal* signal) +{ + jamEntry(); + NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr(); + bool changed = false; + + NodePtr nodePtr; + for(m_nodes.first(nodePtr); nodePtr.i != RNIL; m_nodes.next(nodePtr)) + { + jam(); + if (NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)) + { + jam(); + + if (nodePtr.p->alive) + { + jam(); + ndbassert(m_aliveNodes.get(nodePtr.p->nodeId)); + changed = true; + } + else + { + ndbassert(!m_aliveNodes.get(nodePtr.p->nodeId)); + } + + nodePtr.p->alive = 0; + m_aliveNodes.clear(nodePtr.p->nodeId); + } + } + + + /** + * Problem: Fix a node failure running a protocol + * + * 1. Coordinator node of a protocol dies + * - Elect a new coordinator + * - send ref to user + * + * 2. Non-coordinator dies. + * - make coordinator aware of this + * so that coordinator does not wait for + * conf from faulty node + * - node recovery will restore the non-coordinator. + * + */ +} + +void +Grep::execINCL_NODEREQ(Signal* signal) +{ + jamEntry(); + + //const Uint32 senderRef = signal->theData[0]; + const Uint32 inclNode = signal->theData[1]; + + NodePtr node; + for(m_nodes.first(node); node.i != RNIL; m_nodes.next(node)) + { + jam(); + const Uint32 nodeId = node.p->nodeId; + if (inclNode == nodeId) { + jam(); + + ndbrequire(node.p->alive == 0); + ndbassert(!m_aliveNodes.get(nodeId)); + + node.p->alive = 1; + m_aliveNodes.set(nodeId); + + break; + } + } + + /** + * @todo: if we include this DIH's got to be prepared, later if needed... + */ +#if 0 + signal->theData[0] = reference(); + + sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB); +#endif +} + + +/** + * Helper methods + */ +void +Grep::PSCoord::prepareOperationRec(SubCoordinatorPtr subPtr, + BlockReference subscriber, + Uint32 subId, + Uint32 subKey, + Uint32 request) +{ + subPtr.p->m_coordinatorRef = reference(); + subPtr.p->m_subscriberRef = subscriber; + subPtr.p->m_subscriberData = subPtr.i; + subPtr.p->m_subscriptionId = subId; + subPtr.p->m_subscriptionKey = subKey; + subPtr.p->m_outstandingRequest = request; +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: CREATE SUBSCRIPTION ID + * ------------------------------------------------------------------------ + * + * Requests SUMA to create a unique subscription id + **************************************************************************/ + +void +Grep::PSCoord::execGREP_CREATE_SUBID_REQ(Signal* signal) +{ + jamEntry(); + + CreateSubscriptionIdReq * req = + (CreateSubscriptionIdReq*)signal->getDataPtr(); + BlockReference ref = signal->getSendersBlockRef(); + + SubCoordinatorPtr subPtr; + if( !c_subCoordinatorPool.seize(subPtr)) { + jam(); + SubCoordinator sub; + sub.m_subscriberRef = ref; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sendRefToSS(signal, sub, GrepError::SUBSCRIPTION_ID_NOMEM ); + return; + } + prepareOperationRec(subPtr, + ref, + 0,0, + GSN_CREATE_SUBID_REQ); + + + ndbout_c("SUBID_REQ Ref %d",ref); + req->senderData=subPtr.p->m_subscriberData; + + sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal, + SubCreateReq::SignalLength, JBB); + +#if 1 //def DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA"); +#endif +} + +void +Grep::PSCoord::execCREATE_SUBID_CONF(Signal* signal) +{ + jamEntry(); + CreateSubscriptionIdConf const * conf = + (CreateSubscriptionIdConf *)signal->getDataPtr(); + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 subData = conf->subscriberData; + +#if 1 //def DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)", + subId, subKey); +#endif + + SubCoordinatorPtr subPtr; + c_subCoordinatorPool.getPtr(subPtr, subData); + BlockReference repRef = subPtr.p->m_subscriberRef; + + { // Check that id/key is unique + SubCoordinator key; + SubCoordinatorPtr tmp; + key.m_subscriptionId = subId; + key.m_subscriptionKey = subKey; + if(c_runningSubscriptions.find(tmp, key)){ + jam(); + SubCoordinator sub; + sub.m_subscriberRef=repRef; + sub.m_subscriptionId = subId; + sub.m_subscriptionKey = subKey; + sendRefToSS(signal,sub, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE ); + return; + } + } + + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_CREATE_SUBID_CONF, signal, + CreateSubscriptionIdConf::SignalLength, JBB); + c_subCoordinatorPool.release(subData); + + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionInfo, + GrepEvent::GrepPS_CreateSubIdConf, + subId, + subKey, + (Uint32)GrepError::GE_NO_ERROR); +} + +void +Grep::PSCoord::execCREATE_SUBID_REF(Signal* signal) { + jamEntry(); + CreateSubscriptionIdRef const * ref = + (CreateSubscriptionIdRef *)signal->getDataPtr(); + Uint32 subData = ref->subscriberData; + GrepError::GE_Code err; + + Uint32 sendersBlockRef = signal->getSendersBlockRef(); + if(sendersBlockRef == SUMA_REF) + { + jam(); + err = GrepError::SUBSCRIPTION_ID_SUMA_FAILED_CREATE; + } else { + jam(); + ndbrequire(false); /* Added since errorcode err unhandled + * TODO: fix correct errorcode + */ + err= GrepError::GE_NO_ERROR; // remove compiler warning + } + + SubCoordinatorPtr subPtr; + c_runningSubscriptions.getPtr(subPtr, subData); + BlockReference repref = subPtr.p->m_subscriberRef; + + SubCoordinator sub; + sub.m_subscriberRef = repref; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sendRefToSS(signal,sub, err); + +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: CREATE SUBSCRIPTION + * ------------------------------------------------------------------------ + * + * Creates a subscription for every GREP to its local SUMA. + * GREP node that executes createSubscription becomes the GREP Coord. + **************************************************************************/ + +/** + * Request to create a subscription (sent from SS) + */ +void +Grep::PSCoord::execGREP_SUB_CREATE_REQ(Signal* signal) +{ + jamEntry(); + GrepSubCreateReq const * grepReq = (GrepSubCreateReq *)signal->getDataPtr(); + Uint32 subId = grepReq->subscriptionId; + Uint32 subKey = grepReq->subscriptionKey; + Uint32 subType = grepReq->subscriptionType; + BlockReference rep = signal->getSendersBlockRef(); + + GrepCreateReq * req =(GrepCreateReq*)grepReq; + + SubCoordinatorPtr subPtr; + + if( !c_subCoordinatorPool.seize(subPtr)) { + jam(); + SubCoordinator sub; + sub.m_subscriberRef = rep; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sub.m_outstandingRequest = GSN_GREP_CREATE_REQ; + sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); + return; + } + prepareOperationRec(subPtr, + numberToRef(PSREPBLOCKNO, refToNode(rep)), subId, subKey, + GSN_GREP_CREATE_REQ); + + /* Get the payload of the signal. + */ + SegmentedSectionPtr selectedTablesPtr; + if(subType == SubCreateReq::SelectiveTableSnapshot) { + jam(); + ndbrequire(signal->getNoOfSections()==1); + signal->getSection(selectedTablesPtr,0); + signal->header.m_noOfSections = 0; + } + /** + * Prepare the signal to be sent to Grep participatns + */ + subPtr.p->m_subscriptionType = subType; + req->senderRef = reference(); + req->subscriberRef = numberToRef(PSREPBLOCKNO, refToNode(rep)); + req->subscriberData = subPtr.p->m_subscriberData; + req->subscriptionId = subId; + req->subscriptionKey = subKey; + req->subscriptionType = subType; + + /*add payload if it is a selectivetablesnap*/ + if(subType == SubCreateReq::SelectiveTableSnapshot) { + jam(); + signal->setSection(selectedTablesPtr, 0); + } + + /****************************** + * Send to all PS participants + ******************************/ + NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); + subPtr.p->m_outstandingParticipants = rg; + sendSignal(rg, + GSN_GREP_CREATE_REQ, signal, + GrepCreateReq::SignalLength, JBB); + + +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSCoord: Sent GREP_CREATE_REQ " + "(subId:%d, subKey:%d, subData:%d, subType:%d) to parts", + subId, subKey, subPtr.p->m_subscriberData, subType); +#endif +} + +void +Grep::PSPart::execGREP_CREATE_REQ(Signal* signal) +{ + jamEntry(); + GrepCreateReq * const grepReq = (GrepCreateReq *)signal->getDataPtr(); + const Uint32 subId = grepReq->subscriptionId; + const Uint32 subKey = grepReq->subscriptionKey; + const Uint32 subData = grepReq->subscriberData; + const Uint32 subType = grepReq->subscriptionType; + const Uint32 coordinatorRef = grepReq->senderRef; + const Uint32 subRef = grepReq->subscriberRef; //this is ref to the + //REP node for this + //subscription. + + SubscriptionPtr subPtr; + ndbrequire( c_subscriptionPool.seize(subPtr)); + subPtr.p->m_coordinatorRef = coordinatorRef; + subPtr.p->m_subscriptionId = subId; + subPtr.p->m_subscriptionKey = subKey; + subPtr.p->m_subscriberRef = subRef; + subPtr.p->m_subscriberData = subPtr.i; + subPtr.p->m_subscriptionType = subType; + subPtr.p->m_outstandingRequest = GSN_GREP_CREATE_REQ; + subPtr.p->m_operationPtrI = subData; + + c_subscriptions.add(subPtr); + + SegmentedSectionPtr selectedTablesPtr; + if(subType == SubCreateReq::SelectiveTableSnapshot) { + jam(); + ndbrequire(signal->getNoOfSections()==1); + signal->getSection(selectedTablesPtr,0);// SubCreateReq::TABLE_LIST); + signal->header.m_noOfSections = 0; + } + + /** + * Prepare signal to be sent to SUMA + */ + SubCreateReq * sumaReq = (SubCreateReq *)grepReq; + sumaReq->subscriberRef = GREP_REF; + sumaReq->subscriberData = subPtr.p->m_subscriberData; + sumaReq->subscriptionId = subPtr.p->m_subscriptionId; + sumaReq->subscriptionKey = subPtr.p->m_subscriptionKey; + sumaReq->subscriptionType = subPtr.p->m_subscriptionType; + /*add payload if it is a selectivetablesnap*/ + if(subType == SubCreateReq::SelectiveTableSnapshot) { + jam(); + signal->setSection(selectedTablesPtr, 0); + } + sendSignal(SUMA_REF, + GSN_SUB_CREATE_REQ, + signal, + SubCreateReq::SignalLength, + JBB); +} + +void +Grep::PSPart::execSUB_CREATE_CONF(Signal* signal) +{ + jamEntry(); + + SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr(); + Uint32 subData = conf->subscriberData; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + /** + @todo check why this can fuck up -johan + + ndbrequire(subPtr.p->m_subscriptionId == conf->subscriptionId); + ndbrequire(subPtr.p->m_subscriptionKey == conf->subscriptionKey); + */ +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSPart: Recd SUB_CREATE_CONF " + "(subId:%d, subKey:%d) from SUMA", + conf->subscriptionId, conf->subscriptionKey); +#endif + + /********************* + * Send conf to coord + *********************/ + GrepCreateConf * grepConf = (GrepCreateConf*)conf; + grepConf->senderNodeId = getOwnNodeId(); + grepConf->senderData = subPtr.p->m_operationPtrI; + sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_CREATE_CONF, signal, + GrepCreateConf::SignalLength, JBB); + subPtr.p->m_outstandingRequest = 0; +} + +/** + * Handle errors that either occured in: + * 1) PSPart + * or + * 2) propagated from local SUMA + */ +void +Grep::PSPart::execSUB_CREATE_REF(Signal* signal) +{ + jamEntry(); + SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr(); + Uint32 subData = ref->subscriberData; + GrepError::GE_Code err = (GrepError::GE_Code)ref->err; + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + sendRefToPSCoord(signal, *subPtr.p, err /*error*/); + subPtr.p->m_outstandingRequest = 0; +} + +void +Grep::PSCoord::execGREP_CREATE_CONF(Signal* signal) +{ + jamEntry(); + GrepCreateConf const * conf = (GrepCreateConf *)signal->getDataPtr(); + Uint32 subData = conf->senderData; + Uint32 nodeId = conf->senderNodeId; + + SubCoordinatorPtr subPtr; + c_subCoordinatorPool.getPtr(subPtr, subData); + + ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_CREATE_REQ); + + subPtr.p->m_outstandingParticipants.clearWaitingFor(nodeId); + + if(!subPtr.p->m_outstandingParticipants.done()) return; + /******************************** + * All participants have CONF:ed + ********************************/ + Uint32 subId = subPtr.p->m_subscriptionId; + Uint32 subKey = subPtr.p->m_subscriptionKey; + + GrepSubCreateConf * grepConf = (GrepSubCreateConf *)signal->getDataPtr(); + grepConf->subscriptionId = subId; + grepConf->subscriptionKey = subKey; + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_CREATE_CONF, signal, + GrepSubCreateConf::SignalLength, JBB); + + /** + * Send event report + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionInfo, + GrepEvent::GrepPS_SubCreateConf, + subId, + subKey, + (Uint32)GrepError::GE_NO_ERROR); + + c_subCoordinatorPool.release(subPtr); + +} + +/** + * Handle errors that either occured in: + * 1) PSCoord + * or + * 2) propagated from PSPart + */ +void +Grep::PSCoord::execGREP_CREATE_REF(Signal* signal) +{ + jamEntry(); + GrepCreateRef * const ref = (GrepCreateRef *)signal->getDataPtr(); + Uint32 subData = ref->senderData; + Uint32 err = ref->err; + SubCoordinatorPtr subPtr; + c_runningSubscriptions.getPtr(subPtr, subData); + + sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/); +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: START SUBSCRIPTION + * ------------------------------------------------------------------------ + * + * Starts a subscription at SUMA. + * Each participant starts its own subscription. + **************************************************************************/ + +/** + * Request to start subscription (Sent from SS) + */ +void +Grep::PSCoord::execGREP_SUB_START_REQ(Signal* signal) +{ + jamEntry(); + GrepSubStartReq * const subReq = (GrepSubStartReq *)signal->getDataPtr(); + SubscriptionData::Part part = (SubscriptionData::Part) subReq->part; + Uint32 subId = subReq->subscriptionId; + Uint32 subKey = subReq->subscriptionKey; + BlockReference rep = signal->getSendersBlockRef(); + + SubCoordinatorPtr subPtr; + + if(!c_subCoordinatorPool.seize(subPtr)) { + jam(); + SubCoordinator sub; + sub.m_subscriberRef = rep; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sub.m_outstandingRequest = GSN_GREP_START_REQ; + sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); + return; + } + + prepareOperationRec(subPtr, + numberToRef(PSREPBLOCKNO, refToNode(rep)), + subId, subKey, + GSN_GREP_START_REQ); + + GrepStartReq * const req = (GrepStartReq *) subReq; + req->part = (Uint32) part; + req->subscriptionId = subPtr.p->m_subscriptionId; + req->subscriptionKey = subPtr.p->m_subscriptionKey; + req->senderData = subPtr.p->m_subscriberData; + + /*************************** + * Send to all participants + ***************************/ + NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); + subPtr.p->m_outstandingParticipants = rg; + sendSignal(rg, + GSN_GREP_START_REQ, + signal, + GrepStartReq::SignalLength, JBB); + +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSCoord: Sent GREP_START_REQ " + "(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants", + req->subscriptionId, req->subscriptionKey, req->senderData, part); +#endif +} + + +void +Grep::PSPart::execGREP_START_REQ(Signal* signal) +{ + jamEntry(); + GrepStartReq * const grepReq = (GrepStartReq *) signal->getDataPtr(); + SubscriptionData::Part part = (SubscriptionData::Part)grepReq->part; + Uint32 subId = grepReq->subscriptionId; + Uint32 subKey = grepReq->subscriptionKey; + Uint32 operationPtrI = grepReq->senderData; + + Subscription key; + key.m_subscriptionId = subId; + key.m_subscriptionKey = subKey; + SubscriptionPtr subPtr; + ndbrequire(c_subscriptions.find(subPtr, key));; + subPtr.p->m_outstandingRequest = GSN_GREP_START_REQ; + subPtr.p->m_operationPtrI = operationPtrI; + /** + * send SUB_START_REQ to local SUMA + */ + SubStartReq * sumaReq = (SubStartReq *) grepReq; + sumaReq->subscriptionId = subId; + sumaReq->subscriptionKey = subKey; + sumaReq->subscriberData = subPtr.i; + sumaReq->part = (Uint32) part; + + sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, + SubStartReq::SignalLength, JBB); +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)", + subId, subKey, (Uint32)part); +#endif +} + + +void +Grep::PSPart::execSUB_START_CONF(Signal* signal) +{ + jamEntry(); + + SubStartConf * const conf = (SubStartConf *) signal->getDataPtr(); + SubscriptionData::Part part = (SubscriptionData::Part)conf->part; + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 subData = conf->subscriberData; + Uint32 firstGCI = conf->firstGCI; +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSPart: Recd SUB_START_CONF " + "(subId:%d, subKey:%d, subData:%d)", + subId, subKey, subData); +#endif + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + ndbrequire(subPtr.p->m_subscriptionId == subId); + ndbrequire(subPtr.p->m_subscriptionKey == subKey); + + GrepStartConf * grepConf = (GrepStartConf *)conf; + grepConf->senderData = subPtr.p->m_operationPtrI; + grepConf->part = (Uint32) part; + grepConf->subscriptionKey = subKey; + grepConf->subscriptionId = subId; + grepConf->firstGCI = firstGCI; + grepConf->senderNodeId = getOwnNodeId(); + sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_START_CONF, signal, + GrepStartConf::SignalLength, JBB); + subPtr.p->m_outstandingRequest = 0; + +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSPart: Sent GREP_START_CONF " + "(subId:%d, subKey:%d, subData:%d, part:%d)", + subId, subKey, subData, part); +#endif +} + + +/** + * Handle errors that either occured in: + * 1) PSPart + * or + * 2) propagated from local SUMA + * + * Propagates REF signal to PSCoord + */ +void +Grep::PSPart::execSUB_START_REF(Signal* signal) +{ + SubStartRef * const ref = (SubStartRef *)signal->getDataPtr(); + Uint32 subData = ref->subscriberData; + GrepError::GE_Code err = (GrepError::GE_Code)ref->err; + SubscriptionData::Part part = (SubscriptionData::Part)ref->part; + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + sendRefToPSCoord(signal, *subPtr.p, err /*error*/, part); + subPtr.p->m_outstandingRequest = 0; +} + + +/** + * Logging has started... (says PS Participant) + */ +void +Grep::PSCoord::execGREP_START_CONF(Signal* signal) +{ + jamEntry(); + + GrepStartConf * const conf = (GrepStartConf *) signal->getDataPtr(); + Uint32 subData = conf->senderData; + SubscriptionData::Part part = (SubscriptionData::Part)conf->part; + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 firstGCI = conf->firstGCI; + + SubCoordinatorPtr subPtr; + c_subCoordinatorPool.getPtr(subPtr, subData); + ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_START_REQ); + + subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId); + + if(!subPtr.p->m_outstandingParticipants.done()) return; + jam(); + + /************************* + * All participants ready + *************************/ + GrepSubStartConf * grepConf = (GrepSubStartConf *) conf; + grepConf->part = part; + grepConf->subscriptionId = subId; + grepConf->subscriptionKey = subKey; + grepConf->firstGCI = firstGCI; + + bool ok = false; + switch(part) { + case SubscriptionData::MetaData: + ok = true; + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, + GrepSubStartConf::SignalLength, JBB); + + /** + * Send event report + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionInfo, + GrepEvent::GrepPS_SubStartMetaConf, + subId, subKey, + (Uint32)GrepError::GE_NO_ERROR); + + c_subCoordinatorPool.release(subPtr); + break; + case SubscriptionData::TableData: + ok = true; + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, + GrepSubStartConf::SignalLength, JBB); + + /** + * Send event report + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionInfo, + GrepEvent::GrepPS_SubStartDataConf, + subId, subKey, + (Uint32)GrepError::GE_NO_ERROR); + + + c_subCoordinatorPool.release(subPtr); + break; + } + ndbrequire(ok); + +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) " + "from all slaves", + subId, subKey, (Uint32)part); +#endif +} + +/** + * Handle errors that either occured in: + * 1) PSCoord + * or + * 2) propagated from PSPart + */ +void +Grep::PSCoord::execGREP_START_REF(Signal* signal) +{ + jamEntry(); + GrepStartRef * const ref = (GrepStartRef *)signal->getDataPtr(); + Uint32 subData = ref->senderData; + GrepError::GE_Code err = (GrepError::GE_Code)ref->err; + SubscriptionData::Part part = (SubscriptionData::Part)ref->part; + + SubCoordinatorPtr subPtr; + c_runningSubscriptions.getPtr(subPtr, subData); + sendRefToSS(signal, *subPtr.p, err /*error*/, part); +} + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: REMOVE SUBSCRIPTION + * ------------------------------------------------------------------------ + * + * Remove a subscription at SUMA. + * Each participant removes its own subscription. + * We start by deleting the subscription inside the requestor + * since, we don't know if nodes (REP nodes or DB nodes) + * have disconnected after we sent out this and + * if we dont delete the sub in the requestor now, + * we won't be able to create a new subscription + **************************************************************************/ + +/** + * Request to abort subscription (Sent from SS) + */ +void +Grep::PSCoord::execGREP_SUB_REMOVE_REQ(Signal* signal) +{ + jamEntry(); + GrepSubRemoveReq * const subReq = (GrepSubRemoveReq *)signal->getDataPtr(); + Uint32 subId = subReq->subscriptionId; + Uint32 subKey = subReq->subscriptionKey; + BlockReference rep = signal->getSendersBlockRef(); + + SubCoordinatorPtr subPtr; + if( !c_subCoordinatorPool.seize(subPtr)) { + jam(); + SubCoordinator sub; + sub.m_subscriberRef = rep; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sub.m_outstandingRequest = GSN_GREP_REMOVE_REQ; + sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); + return; + } + + + prepareOperationRec(subPtr, + numberToRef(PSREPBLOCKNO, refToNode(rep)), + subId, subKey, + GSN_GREP_REMOVE_REQ); + + c_runningSubscriptions.add(subPtr); + + GrepRemoveReq * req = (GrepRemoveReq *) subReq; + req->subscriptionId = subPtr.p->m_subscriptionId; + req->subscriptionKey = subPtr.p->m_subscriptionKey; + req->senderData = subPtr.p->m_subscriberData; + req->senderRef = subPtr.p->m_coordinatorRef; + + /*************************** + * Send to all participants + ***************************/ + NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); + subPtr.p->m_outstandingParticipants = rg; + sendSignal(rg, + GSN_GREP_REMOVE_REQ, signal, + GrepRemoveReq::SignalLength, JBB); +} + + +void +Grep::PSPart::execGREP_REMOVE_REQ(Signal* signal) +{ + jamEntry(); + GrepRemoveReq * const grepReq = (GrepRemoveReq *) signal->getDataPtr(); + Uint32 subId = grepReq->subscriptionId; + Uint32 subKey = grepReq->subscriptionKey; + Uint32 subData = grepReq->senderData; + Uint32 coordinator = grepReq->senderRef; + + Subscription key; + key.m_subscriptionId = subId; + key.m_subscriptionKey = subKey; + SubscriptionPtr subPtr; + + if(!c_subscriptions.find(subPtr, key)) + { + /** + * The subscription was not found, so it must be deleted. + * Send CONF back, since it does not exist (thus, it is removed) + */ + GrepRemoveConf * grepConf = (GrepRemoveConf *)grepReq; + grepConf->subscriptionKey = subKey; + grepConf->subscriptionId = subId; + grepConf->senderData = subData; + grepConf->senderNodeId = getOwnNodeId(); + sendSignal(coordinator, GSN_GREP_REMOVE_CONF, signal, + GrepRemoveConf::SignalLength, JBB); + return; + } + + subPtr.p->m_operationPtrI = subData; + subPtr.p->m_coordinatorRef = coordinator; + subPtr.p->m_outstandingRequest = GSN_GREP_REMOVE_REQ; + + /** + * send SUB_REMOVE_REQ to local SUMA + */ + SubRemoveReq * sumaReq = (SubRemoveReq *) grepReq; + sumaReq->subscriptionId = subId; + sumaReq->subscriptionKey = subKey; + sumaReq->senderData = subPtr.i; + sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, + SubStartReq::SignalLength, JBB); +} + + +/** + * SUB_REMOVE_CONF (from local SUMA) + */ +void +Grep::PSPart::execSUB_REMOVE_CONF(Signal* signal) +{ + jamEntry(); + SubRemoveConf * const conf = (SubRemoveConf *) signal->getDataPtr(); + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 subData = conf->subscriberData; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + ndbrequire(subPtr.p->m_subscriptionId == subId); + ndbrequire(subPtr.p->m_subscriptionKey == subKey); + subPtr.p->m_outstandingRequest = 0; + GrepRemoveConf * grepConf = (GrepRemoveConf *)conf; + grepConf->subscriptionKey = subKey; + grepConf->subscriptionId = subId; + grepConf->senderData = subPtr.p->m_operationPtrI; + grepConf->senderNodeId = getOwnNodeId(); + sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_REMOVE_CONF, signal, + GrepRemoveConf::SignalLength, JBB); + c_subscriptions.release(subPtr); + +} + + +/** + * SUB_REMOVE_CONF (from local SUMA) + */ +void +Grep::PSPart::execSUB_REMOVE_REF(Signal* signal) +{ + jamEntry(); + SubRemoveRef * const ref = (SubRemoveRef *)signal->getDataPtr(); + Uint32 subData = ref->subscriberData; + /* GrepError::GE_Code err = (GrepError::GE_Code)ref->err;*/ + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + + //sendSubRemoveRef_PSCoord(signal, *subPtr.p, err /*error*/); +} + + +/** + * Aborting has been carried out (says Participants) + */ +void +Grep::PSCoord::execGREP_REMOVE_CONF(Signal* signal) +{ + jamEntry(); + GrepRemoveConf * const conf = (GrepRemoveConf *) signal->getDataPtr(); + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 senderNodeId = conf->senderNodeId; + Uint32 subData = conf->senderData; + SubCoordinatorPtr subPtr; + c_subCoordinatorPool.getPtr(subPtr, subData); + + ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_REMOVE_REQ); + + subPtr.p->m_outstandingParticipants.clearWaitingFor(senderNodeId); + + if(!subPtr.p->m_outstandingParticipants.done()) { + jam(); + return; + } + jam(); + + /************************* + * All participants ready + *************************/ + + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionInfo, + GrepEvent::GrepPS_SubRemoveConf, + subId, subKey, + GrepError::GE_NO_ERROR); + + GrepSubRemoveConf * grepConf = (GrepSubRemoveConf *) conf; + grepConf->subscriptionId = subId; + grepConf->subscriptionKey = subKey; + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_REMOVE_CONF, signal, + GrepSubRemoveConf::SignalLength, JBB); + + c_subCoordinatorPool.release(subPtr); +} + + + +void +Grep::PSCoord::execGREP_REMOVE_REF(Signal* signal) +{ + jamEntry(); + GrepRemoveRef * const ref = (GrepRemoveRef *)signal->getDataPtr(); + Uint32 subData = ref->senderData; + Uint32 err = ref->err; + SubCoordinatorPtr subPtr; + + /** + * Get the operationrecord matching subdata and remove it. Subsequent + * execGREP_REMOVE_REF will simply be ignored at this stage. + */ + for( c_runningSubscriptions.first(c_subPtr); + !c_subPtr.isNull(); c_runningSubscriptions.next(c_subPtr)) { + jam(); + subPtr.i = c_subPtr.curr.i; + subPtr.p = c_runningSubscriptions.getPtr(subPtr.i); + if(subData == subPtr.i) + { + sendRefToSS(signal, *subPtr.p, (GrepError::GE_Code)err /*error*/); + c_runningSubscriptions.release(subPtr); + return; + } + } + return; +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: LOG RECORDS (COMING IN FROM LOCAL SUMA) + * ------------------------------------------------------------------------ + * + * After the subscription is started, we get log records from SUMA. + * Both table data and meta data log records are received. + * + * TODO: + * @todo Changes in meta data is currently not + * allowed during global replication + **************************************************************************/ + +void +Grep::PSPart::execSUB_META_DATA(Signal* signal) +{ + jamEntry(); + if(m_recoveryMode) { + jam(); + return; + } + /** + * METASCAN and METALOG + */ + SubMetaData * data = (SubMetaData *) signal->getDataPtrSend(); + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, data->subscriberData); + + /*************************** + * Forward data to REP node + ***************************/ + sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, + SubMetaData::SignalLength, JBB); +#ifdef DEBUG_GREP_SUBSCRIPTION + ndbout_c("Grep::PSPart: Sent SUB_META_DATA to REP " + "(TableId: %d, SenderData: %d, GCI: %d)", + data->tableId, data->senderData, data->gci); +#endif +} + +/** + * Receive table data from SUMA and dispatches it to REP node. + */ +void +Grep::PSPart::execSUB_TABLE_DATA(Signal* signal) +{ + jamEntry(); + if(m_recoveryMode) { + jam(); + return; + } + ndbrequire(m_repRef!=0); + + if(!assembleFragments(signal)) { jam(); return; } + + /** + * Check if it is SCAN or LOG data that has arrived + */ + if(signal->getNoOfSections() == 2) + { + jam(); + /** + * DATASCAN - Not marked with GCI, so mark with latest seen GCI + */ + if(m_firstScanGCI == 1 && m_lastScanGCI == 0) { + m_firstScanGCI = m_latestSeenGCI; + m_lastScanGCI = m_latestSeenGCI; + } + SubTableData * data = (SubTableData*)signal->getDataPtrSend(); + Uint32 subData = data->senderData; + data->gci = m_latestSeenGCI; + data->logType = SubTableData::SCAN; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal, + SubTableData::SignalLength, JBB); +#ifdef DEBUG_GREP + ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)", + data->gci); +#endif + } + else + { + jam(); + /** + * DATALOG (TRIGGER) - Already marked with GCI + */ + SubTableData * data = (SubTableData*)signal->getDataPtrSend(); + data->logType = SubTableData::LOG; + Uint32 subData = data->senderData; + if (data->gci > m_latestSeenGCI) m_latestSeenGCI = data->gci; + + // Reformat to sections and send to replication node. + LinearSectionPtr ptr[3]; + ptr[0].p = signal->theData + 25; + ptr[0].sz = data->noOfAttributes; + ptr[1].p = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE; + ptr[1].sz = data->dataSize; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, + signal, SubTableData::SignalLength, JBB, ptr, 2); +#ifdef DEBUG_GREP + ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)", + data->gci); +#endif + } +} + + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: START SYNCHRONIZATION + * ------------------------------------------------------------------------ + * + * + **************************************************************************/ + +/** + * Request to start sync (from Rep SS) + */ +void +Grep::PSCoord::execGREP_SUB_SYNC_REQ(Signal* signal) +{ + jamEntry(); + GrepSubSyncReq * const subReq = (GrepSubSyncReq*)signal->getDataPtr(); + SubscriptionData::Part part = (SubscriptionData::Part) subReq->part; + Uint32 subId = subReq->subscriptionId; + Uint32 subKey = subReq->subscriptionKey; + BlockReference rep = signal->getSendersBlockRef(); + + SubCoordinatorPtr subPtr; + if( !c_subCoordinatorPool.seize(subPtr)) { + jam(); + SubCoordinator sub; + sub.m_subscriberRef = rep; + sub.m_subscriptionId = 0; + sub.m_subscriptionKey = 0; + sub.m_outstandingRequest = GSN_GREP_SYNC_REQ; + sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL); + return; + } + + prepareOperationRec(subPtr, + numberToRef(PSREPBLOCKNO, refToNode(rep)), + subId, subKey, + GSN_GREP_SYNC_REQ); + + GrepSyncReq * req = (GrepSyncReq *)subReq; + req->subscriptionId = subPtr.p->m_subscriptionId; + req->subscriptionKey = subPtr.p->m_subscriptionKey; + req->senderData = subPtr.p->m_subscriberData; + req->part = (Uint32)part; + + /*************************** + * Send to all participants + ***************************/ + NodeReceiverGroup rg(GREP, m_grep->m_aliveNodes); + subPtr.p->m_outstandingParticipants = rg; + sendSignal(rg, + GSN_GREP_SYNC_REQ, signal, GrepSyncReq::SignalLength, JBB); +} + + +/** + * Sync req from Grep::PSCoord to PS particpant + */ +void +Grep::PSPart::execGREP_SYNC_REQ(Signal* signal) +{ + jamEntry(); + + GrepSyncReq * const grepReq = (GrepSyncReq *) signal->getDataPtr(); + Uint32 part = grepReq->part; + Uint32 subId = grepReq->subscriptionId; + Uint32 subKey = grepReq->subscriptionKey; + Uint32 subData = grepReq->senderData; + + Subscription key; + key.m_subscriptionId = subId; + key.m_subscriptionKey = subKey; + SubscriptionPtr subPtr; + ndbrequire(c_subscriptions.find(subPtr, key)); + subPtr.p->m_operationPtrI = subData; + subPtr.p->m_outstandingRequest = GSN_GREP_SYNC_REQ; + /********************************** + * Send SUB_SYNC_REQ to local SUMA + **********************************/ + SubSyncReq * sumaReq = (SubSyncReq *)grepReq; + sumaReq->subscriptionId = subId; + sumaReq->subscriptionKey = subKey; + sumaReq->subscriberData = subPtr.i; + sumaReq->part = part; + sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, + SubSyncReq::SignalLength, JBB); +} + + +/** + * SYNC conf from SUMA + */ +void +Grep::PSPart::execSUB_SYNC_CONF(Signal* signal) +{ + jamEntry(); + + SubSyncConf * const conf = (SubSyncConf *) signal->getDataPtr(); + Uint32 part = conf->part; + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 subData = conf->subscriberData; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + + ndbrequire(subPtr.p->m_subscriptionId == subId); + ndbrequire(subPtr.p->m_subscriptionKey == subKey); + + GrepSyncConf * grepConf = (GrepSyncConf *)conf; + grepConf->senderNodeId = getOwnNodeId(); + grepConf->part = part; + grepConf->firstGCI = m_firstScanGCI; + grepConf->lastGCI = m_lastScanGCI; + grepConf->subscriptionId = subId; + grepConf->subscriptionKey = subKey; + grepConf->senderData = subPtr.p->m_operationPtrI; + sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_SYNC_CONF, signal, + GrepSyncConf::SignalLength, JBB); + + m_firstScanGCI = 1; + m_lastScanGCI = 0; + subPtr.p->m_outstandingRequest = 0; +} + +/** + * Handle errors that either occured in: + * 1) PSPart + * or + * 2) propagated from local SUMA + * + * Propagates REF signal to PSCoord + */ +void +Grep::PSPart::execSUB_SYNC_REF(Signal* signal) { + jamEntry(); + SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr(); + Uint32 subData = ref->subscriberData; + GrepError::GE_Code err = (GrepError::GE_Code)ref->err; + SubscriptionData::Part part = (SubscriptionData::Part)ref->part; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr, subData); + sendRefToPSCoord(signal, *subPtr.p, err /*error*/ ,part); + subPtr.p->m_outstandingRequest = 0; +} + +/** + * Syncing has started... (says PS Participant) + */ +void +Grep::PSCoord::execGREP_SYNC_CONF(Signal* signal) +{ + jamEntry(); + + GrepSyncConf const * conf = (GrepSyncConf *)signal->getDataPtr(); + Uint32 part = conf->part; + Uint32 firstGCI = conf->firstGCI; + Uint32 lastGCI = conf->lastGCI; + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + Uint32 subData = conf->senderData; + + SubCoordinatorPtr subPtr; + c_subCoordinatorPool.getPtr(subPtr, subData); + ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_SYNC_REQ); + + subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId); + if(!subPtr.p->m_outstandingParticipants.done()) return; + + /** + * Send event + */ + GrepEvent::Subscription event; + if(part == SubscriptionData::MetaData) + event = GrepEvent::GrepPS_SubSyncMetaConf; + else + event = GrepEvent::GrepPS_SubSyncDataConf; + + /* @todo Johan: Add firstGCI here. /Lars */ + m_grep->sendEventRep(signal, NDB_LE_GrepSubscriptionInfo, + event, subId, subKey, + (Uint32)GrepError::GE_NO_ERROR, + lastGCI); + + /************************* + * All participants ready + *************************/ + GrepSubSyncConf * grepConf = (GrepSubSyncConf *)conf; + grepConf->part = part; + grepConf->firstGCI = firstGCI; + grepConf->lastGCI = lastGCI; + grepConf->subscriptionId = subId; + grepConf->subscriptionKey = subKey; + + sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_SYNC_CONF, signal, + GrepSubSyncConf::SignalLength, JBB); + c_subCoordinatorPool.release(subPtr); +} + +/** + * Handle errors that either occured in: + * 1) PSCoord + * or + * 2) propagated from PSPart + */ +void +Grep::PSCoord::execGREP_SYNC_REF(Signal* signal) { + jamEntry(); + GrepSyncRef * const ref = (GrepSyncRef *)signal->getDataPtr(); + Uint32 subData = ref->senderData; + SubscriptionData::Part part = (SubscriptionData::Part)ref->part; + GrepError::GE_Code err = (GrepError::GE_Code)ref->err; + SubCoordinatorPtr subPtr; + c_runningSubscriptions.getPtr(subPtr, subData); + sendRefToSS(signal, *subPtr.p, err /*error*/, part); +} + + + +void +Grep::PSCoord::sendRefToSS(Signal * signal, + SubCoordinator sub, + GrepError::GE_Code err, + SubscriptionData::Part part) { + /** + + GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend(); + ref->senderData = sub.m_subscriberData; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->err = err; + sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, + GrepCreateRef::SignalLength, JBB); +*/ + + jam(); + GrepEvent::Subscription event; + switch(sub.m_outstandingRequest) { + case GSN_GREP_CREATE_SUBID_REQ: + { + jam(); + CreateSubscriptionIdRef * ref = + (CreateSubscriptionIdRef*)signal->getDataPtrSend(); + ref->err = (Uint32)err; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + sendSignal(sub.m_subscriberRef, + GSN_GREP_CREATE_SUBID_REF, + signal, + CreateSubscriptionIdRef::SignalLength, + JBB); + event = GrepEvent::GrepPS_CreateSubIdRef; + } + break; + case GSN_GREP_CREATE_REQ: + { + jam(); + GrepSubCreateRef * ref = (GrepSubCreateRef*)signal->getDataPtrSend(); + ref->err = (Uint32)err; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_CREATE_REF, signal, + GrepSubCreateRef::SignalLength, JBB); + event = GrepEvent::GrepPS_SubCreateRef; + } + break; + case GSN_GREP_SYNC_REQ: + { + jam(); + GrepSubSyncRef * ref = (GrepSubSyncRef*)signal->getDataPtrSend(); + ref->err = (Uint32)err; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->part = (SubscriptionData::Part) part; + sendSignal(sub.m_subscriberRef, + GSN_GREP_SUB_SYNC_REF, + signal, + GrepSubSyncRef::SignalLength, + JBB); + if(part == SubscriptionData::MetaData) + event = GrepEvent::GrepPS_SubSyncMetaRef; + else + event = GrepEvent::GrepPS_SubSyncDataRef; + } + break; + case GSN_GREP_START_REQ: + { + jam(); + GrepSubStartRef * ref = (GrepSubStartRef*)signal->getDataPtrSend(); + ref->err = (Uint32)err; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + + sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_START_REF, + signal, GrepSubStartRef::SignalLength, JBB); + if(part == SubscriptionData::MetaData) + event = GrepEvent::GrepPS_SubStartMetaRef; + else + event = GrepEvent::GrepPS_SubStartDataRef; + /** + * Send event report + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionAlert, + event, + sub.m_subscriptionId, + sub.m_subscriptionKey, + (Uint32)err); + } + break; + case GSN_GREP_REMOVE_REQ: + { + jam(); + GrepSubRemoveRef * ref = (GrepSubRemoveRef*)signal->getDataPtrSend(); + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->err = (Uint32)err; + + sendSignal(sub.m_subscriberRef, + GSN_GREP_SUB_REMOVE_REF, + signal, + GrepSubRemoveRef::SignalLength, + JBB); + + event = GrepEvent::GrepPS_SubRemoveRef; + } + break; + default: + ndbrequire(false); + event= GrepEvent::Rep_Disconnect; // remove compiler warning + } + /** + * Finally, send an event. + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionAlert, + event, + sub.m_subscriptionId, + sub.m_subscriptionKey, + err); + +} + + +void +Grep::PSPart::sendRefToPSCoord(Signal * signal, + Subscription sub, + GrepError::GE_Code err, + SubscriptionData::Part part) { + + jam(); + GrepEvent::Subscription event; + switch(sub.m_outstandingRequest) { + + case GSN_GREP_CREATE_REQ: + { + GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend(); + ref->senderData = sub.m_subscriberData; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->err = err; + sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, + GrepCreateRef::SignalLength, JBB); + + event = GrepEvent::GrepPS_SubCreateRef; + } + break; + case GSN_GREP_SYNC_REQ: + { + GrepSyncRef * ref = (GrepSyncRef*)signal->getDataPtrSend(); + ref->senderData = sub.m_subscriberData; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->part = part; + ref->err = err; + sendSignal(sub.m_coordinatorRef, + GSN_GREP_SYNC_REF, signal, + GrepSyncRef::SignalLength, JBB); + if(part == SubscriptionData::MetaData) + event = GrepEvent::GrepPS_SubSyncMetaRef; + else + event = GrepEvent::GrepPS_SubSyncDataRef; + } + break; + case GSN_GREP_START_REQ: + { + jam(); + GrepStartRef * ref = (GrepStartRef*)signal->getDataPtrSend(); + ref->senderData = sub.m_subscriberData; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->part = (Uint32) part; + ref->err = err; + sendSignal(sub.m_coordinatorRef, GSN_GREP_START_REF, signal, + GrepStartRef::SignalLength, JBB); + if(part == SubscriptionData::MetaData) + event = GrepEvent::GrepPS_SubStartMetaRef; + else + event = GrepEvent::GrepPS_SubStartDataRef; + } + break; + + case GSN_GREP_REMOVE_REQ: + { + jamEntry(); + GrepRemoveRef * ref = (GrepRemoveRef*)signal->getDataPtrSend(); + ref->senderData = sub.m_operationPtrI; + ref->subscriptionId = sub.m_subscriptionId; + ref->subscriptionKey = sub.m_subscriptionKey; + ref->err = err; + sendSignal(sub.m_coordinatorRef, GSN_GREP_REMOVE_REF, signal, + GrepCreateRef::SignalLength, JBB); + + } + break; + default: + ndbrequire(false); + event= GrepEvent::Rep_Disconnect; // remove compiler warning + } + + /** + * Finally, send an event. + */ + m_grep->sendEventRep(signal, + NDB_LE_GrepSubscriptionAlert, + event, + sub.m_subscriptionId, + sub.m_subscriptionKey, + err); + +} + +/************************************************************************** + * ------------------------------------------------------------------------ + * MODULE: GREP PS Coordinator GCP + * ------------------------------------------------------------------------ + * + * + **************************************************************************/ + +void +Grep::PSPart::execSUB_GCP_COMPLETE_REP(Signal* signal) +{ + jamEntry(); + if(m_recoveryMode) { + jam(); + return; + } + SubGcpCompleteRep * rep = (SubGcpCompleteRep *)signal->getDataPtrSend(); + rep->senderRef = reference(); + + if (rep->gci > m_latestSeenGCI) m_latestSeenGCI = rep->gci; + SubscriptionPtr subPtr; + c_subscriptions.first(c_subPtr); + for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) { + + subPtr.i = c_subPtr.curr.i; + subPtr.p = c_subscriptions.getPtr(subPtr.i); + sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_GCP_COMPLETE_REP, signal, + SubGcpCompleteRep::SignalLength, JBB); + } + +#ifdef DEBUG_GREP + ndbout_c("Grep::PSPart: Recd SUB_GCP_COMPLETE_REP " + "(GCI: %d, nodeId: %d) from SUMA", + rep->gci, refToNode(rep->senderRef)); +#endif +} + + +void +Grep::PSPart::execSUB_SYNC_CONTINUE_REQ(Signal* signal) +{ + jamEntry(); + SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtr(); + Uint32 subData = req->subscriberData; + + SubscriptionPtr subPtr; + c_subscriptions.getPtr(subPtr,subData); + + /** + * @todo Figure out how to control how much data we can receive? + */ + SubSyncContinueConf * conf = (SubSyncContinueConf*)req; + conf->subscriptionId = subPtr.p->m_subscriptionId; + conf->subscriptionKey = subPtr.p->m_subscriptionKey; + sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, + SubSyncContinueConf::SignalLength, JBB); +} + +void +Grep::sendEventRep(Signal * signal, + Ndb_logevent_type type, + GrepEvent::Subscription event, + Uint32 subId, + Uint32 subKey, + Uint32 err, + Uint32 other) { + jam(); + signal->theData[0] = type; + signal->theData[1] = event; + signal->theData[2] = subId; + signal->theData[3] = subKey; + signal->theData[4] = err; + + if(other==0) + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5 ,JBB); + else { + signal->theData[5] = other; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6 ,JBB); + } +} |