diff options
author | pekka@orca.ndb.mysql.com <> | 2006-08-22 14:58:38 +0200 |
---|---|---|
committer | pekka@orca.ndb.mysql.com <> | 2006-08-22 14:58:38 +0200 |
commit | a5b835abbd4eb427dec601f38e292df7f002793f (patch) | |
tree | 2749492f375378597e7bd474f93906730c3e55b7 /ndb | |
parent | 1b6f2f1c328edde9a9dca9e99fb1193a719fa0c3 (diff) | |
parent | 9dc6087d4df6c4fe77fc2b9cd36e1f8f446932e4 (diff) | |
download | mariadb-git-a5b835abbd4eb427dec601f38e292df7f002793f.tar.gz |
Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-ndb
into orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug21017
Diffstat (limited to 'ndb')
-rw-r--r-- | ndb/src/kernel/blocks/backup/Backup.cpp | 64 | ||||
-rw-r--r-- | ndb/src/mgmapi/mgmapi.cpp | 11 | ||||
-rw-r--r-- | ndb/src/mgmclient/CommandInterpreter.cpp | 31 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 11 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.hpp | 2 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 1 | ||||
-rw-r--r-- | ndb/src/ndbapi/ClusterMgr.cpp | 98 | ||||
-rw-r--r-- | ndb/src/ndbapi/ClusterMgr.hpp | 10 |
8 files changed, 181 insertions, 47 deletions
diff --git a/ndb/src/kernel/blocks/backup/Backup.cpp b/ndb/src/kernel/blocks/backup/Backup.cpp index 43c1de5e2b3..10318e5f52d 100644 --- a/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/ndb/src/kernel/blocks/backup/Backup.cpp @@ -274,36 +274,48 @@ Backup::execCONTINUEB(Signal* signal) BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptr_I); - TablePtr tabPtr; - ptr.p->tables.getPtr(tabPtr, tabPtr_I); - FragmentPtr fragPtr; - tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); - BackupFilePtr filePtr; - ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr); - - const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2; - Uint32 * dst; - if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz)) + if (tabPtr_I == RNIL) { - sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4); + closeFiles(signal, ptr); return; } + jam(); + TablePtr tabPtr; + ptr.p->tables.getPtr(tabPtr, tabPtr_I); + jam(); + if(tabPtr.p->fragments.getSize()) + { + FragmentPtr fragPtr; + tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); - BackupFormat::CtlFile::FragmentInfo * fragInfo = - (BackupFormat::CtlFile::FragmentInfo*)dst; - fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO); - fragInfo->SectionLength = htonl(sz); - fragInfo->TableId = htonl(fragPtr.p->tableId); - fragInfo->FragmentNo = htonl(fragPtr_I); - fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF); - fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32); - fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF); - fragInfo->FilePosHigh = htonl(0 >> 32); + BackupFilePtr filePtr; + ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr); - filePtr.p->operation.dataBuffer.updateWritePtr(sz); + const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2; + Uint32 * dst; + if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz)) + { + sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4); + return; + } + + BackupFormat::CtlFile::FragmentInfo * fragInfo = + (BackupFormat::CtlFile::FragmentInfo*)dst; + fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO); + fragInfo->SectionLength = htonl(sz); + fragInfo->TableId = htonl(fragPtr.p->tableId); + fragInfo->FragmentNo = htonl(fragPtr_I); + fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF); + fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32); + fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF); + fragInfo->FilePosHigh = htonl(0 >> 32); + + filePtr.p->operation.dataBuffer.updateWritePtr(sz); + + fragPtr_I++; + } - fragPtr_I++; if (fragPtr_I == tabPtr.p->fragments.getSize()) { signal->theData[0] = tabPtr.p->tableId; @@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal) TablePtr tabPtr; ptr.p->tables.first(tabPtr); + if (tabPtr.i == RNIL) + { + closeFiles(signal, ptr); + return; + } + signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO; signal->theData[1] = ptr.i; signal->theData[2] = tabPtr.i; diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 4428b158b6b..9bf19dda3a4 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -1389,7 +1389,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], MGM_END() }; CHECK_HANDLE(handle, -1); - + const char *hostname= ndb_mgm_get_connected_host(handle); int port= ndb_mgm_get_connected_port(handle); SocketClient s(hostname, port); @@ -1411,19 +1411,20 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], } args.put("filter", tmp.c_str()); } - + int tmp = handle->socket; handle->socket = sockfd; - + const Properties *reply; reply = ndb_mgm_call(handle, stat_reply, "listen event", &args); - + handle->socket = tmp; - + if(reply == NULL) { close(sockfd); CHECK_REPLY(reply, -1); } + delete reply; return sockfd; } diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index 58b98671b14..ba68f6e4f0a 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -173,8 +173,15 @@ private: bool rep_connected; #endif struct NdbThread* m_event_thread; + NdbMutex *m_print_mutex; }; +struct event_thread_param { + NdbMgmHandle *m; + NdbMutex **p; +}; + +NdbMutex* print_mutex; /* * Facade object for CommandInterpreter @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; + m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); + NdbMutex_Destroy(m_print_mutex); } static bool @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* -event_thread_run(void* m) +event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); - NdbMgmHandle handle= *(NdbMgmHandle*)m; + struct event_thread_param param= *(struct event_thread_param*)p; + NdbMgmHandle handle= *(param.m); + NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, @@ -466,7 +477,11 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) - ndbout << tmp; + if(tmp && strlen(tmp)) + { + Guard g(printmutex); + ndbout << tmp; + } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; + struct event_thread_param p; + p.m= &m_mgmsrv2; + p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, - (void**)&m_mgmsrv2, + (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; + return result; } @@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line) DBUG_RETURN(true); if (strcasecmp(firstToken, "SHOW") == 0) { + Guard g(m_print_mutex); executeShow(allAfterFirstToken); DBUG_RETURN(true); } @@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { + Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); @@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters) if(it == 0){ ndbout_c("Unable to create config iterator"); + ndb_mgm_destroy_configuration(conf); return; } NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it); @@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters) print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0); print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0); // ndbout << helpTextShow; + ndb_mgm_destroy_configuration(conf); return; } else if (strcasecmp(parameters, "PROPERTIES") == 0 || strcasecmp(parameters, "PROP") == 0) { diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 69c0286a1de..5fabb84adb7 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -77,7 +77,6 @@ }\ } -extern int global_flag_send_heartbeat_now; extern int g_no_nodeid_checks; extern my_bool opt_core; @@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> +void +MgmtSrvr::updateStatus() +{ + theFacade->theClusterMgr->forceHB(); +} + int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, @@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_matching_type && !found_free_node) { // we have a temporary error which might be due to that // we have got the latest connect status from db-nodes. Force update. - global_flag_send_heartbeat_now= 1; + updateStatus(); } BaseString type_string, type_c_string; @@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() if (!m_reserved_nodes.isclear()) { m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes - global_flag_send_heartbeat_now= 1; + m_mgmsrv.updateStatus(); char tmp_str[128]; m_mgmsrv.m_reserved_nodes.getText(tmp_str); diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index 187f225470a..17debb19f50 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -490,6 +490,8 @@ public: void get_connected_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } + void updateStatus(); + //************************************************************************** private: //************************************************************************** diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 0524aba4c32..7f5b0e29442 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -982,6 +982,7 @@ printNodeStatus(OutputStream *output, MgmtSrvr &mgmsrv, enum ndb_mgm_node_type type) { NodeId nodeId = 0; + mgmsrv.updateStatus(); while(mgmsrv.getNextNodeId(&nodeId, type)) { enum ndb_mgm_node_status status; Uint32 startPhase = 0, diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index fbff57d3168..475561af225 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -37,7 +37,7 @@ #include <mgmapi_configuration.hpp> #include <mgmapi_config_parameters.h> -int global_flag_send_heartbeat_now= 0; +//#define DEBUG_REG // Just a C wrapper for threadMain extern "C" @@ -67,6 +67,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); + waitForHBCond= NdbCondition_Create(); + waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; @@ -77,7 +79,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); - doStop(); + doStop(); + NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } @@ -164,6 +167,70 @@ ClusterMgr::doStop( ){ } void +ClusterMgr::forceHB() +{ + theFacade.lock_mutex(); + + if(waitingForHB) + { + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + theFacade.unlock_mutex(); + return; + } + + waitingForHB= true; + + NodeBitmask ndb_nodes; + ndb_nodes.clear(); + waitForHBFromNodes.clear(); + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if(!theNodes[i].defined) + continue; + if(theNodes[i].m_info.m_type == NodeInfo::DB) + { + ndb_nodes.set(i); + const ClusterMgr::Node &node= getNodeInfo(i); + waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes); + } + } + waitForHBFromNodes.bitAND(ndb_nodes); + +#ifdef DEBUG_REG + char buf[128]; + ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); + + signal.theVerId_signalNumber = GSN_API_REGREQ; + signal.theReceiversBlockNumber = QMGR; + signal.theTrace = 0; + signal.theLength = ApiRegReq::SignalLength; + + ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); + req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); + req->version = NDB_VERSION; + + int nodeId= 0; + for(int i=0; + NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i)); + i= nodeId+1) + { +#ifdef DEBUG_REG + ndbout << "FORCE HB to " << nodeId << endl; +#endif + theFacade.sendSignalUnCond(&signal, nodeId); + } + + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + waitingForHB= false; +#ifdef DEBUG_REG + ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + theFacade.unlock_mutex(); +} + +void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); @@ -184,9 +251,6 @@ ClusterMgr::threadMain( ){ /** * Start of Secure area for use of Transporter */ - int send_heartbeat_now= global_flag_send_heartbeat_now; - global_flag_send_heartbeat_now= 0; - theFacade.lock_mutex(); for (int i = 1; i < MAX_NODES; i++){ /** @@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){ } theNode.hbCounter += timeSlept; - if (theNode.hbCounter >= theNode.hbFrequency || - send_heartbeat_now) { + if (theNode.hbCounter >= theNode.hbFrequency) { /** * It is now time to send a new Heartbeat */ @@ -226,7 +289,7 @@ ClusterMgr::threadMain( ){ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); @@ -278,7 +341,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif @@ -319,7 +382,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif @@ -351,6 +414,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } + + if(waitingForHB) + { + waitForHBFromNodes.clear(nodeId); + + if(waitForHBFromNodes.isclear()) + { + waitingForHB= false; + NdbCondition_Broadcast(waitForHBCond); + } + } } void @@ -379,6 +453,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ default: break; } + + waitForHBFromNodes.clear(nodeId); + if(waitForHBFromNodes.isclear()) + NdbCondition_Signal(waitForHBCond); } void diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp index 1a1e622a889..d2bcc52f7e8 100644 --- a/ndb/src/ndbapi/ClusterMgr.hpp +++ b/ndb/src/ndbapi/ClusterMgr.hpp @@ -49,7 +49,9 @@ public: void doStop(); void startThread(); - + + void forceHB(); + private: void threadMain(); @@ -85,7 +87,11 @@ private: Uint32 noOfConnectedNodes; Node theNodes[MAX_NODES]; NdbThread* theClusterMgrThread; - + + NodeBitmask waitForHBFromNodes; // used in forcing HBs + NdbCondition* waitForHBCond; + bool waitingForHB; + /** * Used for controlling start/stop of the thread */ |