summaryrefslogtreecommitdiff
path: root/ndb
diff options
context:
space:
mode:
authorpekka@orca.ndb.mysql.com <>2006-08-22 14:58:38 +0200
committerpekka@orca.ndb.mysql.com <>2006-08-22 14:58:38 +0200
commita5b835abbd4eb427dec601f38e292df7f002793f (patch)
tree2749492f375378597e7bd474f93906730c3e55b7 /ndb
parent1b6f2f1c328edde9a9dca9e99fb1193a719fa0c3 (diff)
parent9dc6087d4df6c4fe77fc2b9cd36e1f8f446932e4 (diff)
downloadmariadb-git-a5b835abbd4eb427dec601f38e292df7f002793f.tar.gz
Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-ndb
into orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug21017
Diffstat (limited to 'ndb')
-rw-r--r--ndb/src/kernel/blocks/backup/Backup.cpp64
-rw-r--r--ndb/src/mgmapi/mgmapi.cpp11
-rw-r--r--ndb/src/mgmclient/CommandInterpreter.cpp31
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp11
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.hpp2
-rw-r--r--ndb/src/mgmsrv/Services.cpp1
-rw-r--r--ndb/src/ndbapi/ClusterMgr.cpp98
-rw-r--r--ndb/src/ndbapi/ClusterMgr.hpp10
8 files changed, 181 insertions, 47 deletions
diff --git a/ndb/src/kernel/blocks/backup/Backup.cpp b/ndb/src/kernel/blocks/backup/Backup.cpp
index 43c1de5e2b3..10318e5f52d 100644
--- a/ndb/src/kernel/blocks/backup/Backup.cpp
+++ b/ndb/src/kernel/blocks/backup/Backup.cpp
@@ -274,36 +274,48 @@ Backup::execCONTINUEB(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, ptr_I);
- TablePtr tabPtr;
- ptr.p->tables.getPtr(tabPtr, tabPtr_I);
- FragmentPtr fragPtr;
- tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
- BackupFilePtr filePtr;
- ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
-
- const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
- Uint32 * dst;
- if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
+ if (tabPtr_I == RNIL)
{
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
+ closeFiles(signal, ptr);
return;
}
+ jam();
+ TablePtr tabPtr;
+ ptr.p->tables.getPtr(tabPtr, tabPtr_I);
+ jam();
+ if(tabPtr.p->fragments.getSize())
+ {
+ FragmentPtr fragPtr;
+ tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I);
- BackupFormat::CtlFile::FragmentInfo * fragInfo =
- (BackupFormat::CtlFile::FragmentInfo*)dst;
- fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
- fragInfo->SectionLength = htonl(sz);
- fragInfo->TableId = htonl(fragPtr.p->tableId);
- fragInfo->FragmentNo = htonl(fragPtr_I);
- fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
- fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
- fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF);
- fragInfo->FilePosHigh = htonl(0 >> 32);
+ BackupFilePtr filePtr;
+ ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
- filePtr.p->operation.dataBuffer.updateWritePtr(sz);
+ const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2;
+ Uint32 * dst;
+ if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz))
+ {
+ sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4);
+ return;
+ }
+
+ BackupFormat::CtlFile::FragmentInfo * fragInfo =
+ (BackupFormat::CtlFile::FragmentInfo*)dst;
+ fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO);
+ fragInfo->SectionLength = htonl(sz);
+ fragInfo->TableId = htonl(fragPtr.p->tableId);
+ fragInfo->FragmentNo = htonl(fragPtr_I);
+ fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF);
+ fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32);
+ fragInfo->FilePosLow = htonl(0 & 0xFFFFFFFF);
+ fragInfo->FilePosHigh = htonl(0 >> 32);
+
+ filePtr.p->operation.dataBuffer.updateWritePtr(sz);
+
+ fragPtr_I++;
+ }
- fragPtr_I++;
if (fragPtr_I == tabPtr.p->fragments.getSize())
{
signal->theData[0] = tabPtr.p->tableId;
@@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal)
TablePtr tabPtr;
ptr.p->tables.first(tabPtr);
+ if (tabPtr.i == RNIL)
+ {
+ closeFiles(signal, ptr);
+ return;
+ }
+
signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index 4428b158b6b..9bf19dda3a4 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -1389,7 +1389,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[],
MGM_END()
};
CHECK_HANDLE(handle, -1);
-
+
const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle);
SocketClient s(hostname, port);
@@ -1411,19 +1411,20 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[],
}
args.put("filter", tmp.c_str());
}
-
+
int tmp = handle->socket;
handle->socket = sockfd;
-
+
const Properties *reply;
reply = ndb_mgm_call(handle, stat_reply, "listen event", &args);
-
+
handle->socket = tmp;
-
+
if(reply == NULL) {
close(sockfd);
CHECK_REPLY(reply, -1);
}
+ delete reply;
return sockfd;
}
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp
index 58b98671b14..ba68f6e4f0a 100644
--- a/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -173,8 +173,15 @@ private:
bool rep_connected;
#endif
struct NdbThread* m_event_thread;
+ NdbMutex *m_print_mutex;
};
+struct event_thread_param {
+ NdbMgmHandle *m;
+ NdbMutex **p;
+};
+
+NdbMutex* print_mutex;
/*
* Facade object for CommandInterpreter
@@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
m_connected= false;
m_event_thread= 0;
try_reconnect = 0;
+ m_print_mutex= NdbMutex_Create();
#ifdef HAVE_GLOBAL_REPLICATION
rep_host = NULL;
m_repserver = NULL;
@@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose)
CommandInterpreter::~CommandInterpreter()
{
disconnect();
+ NdbMutex_Destroy(m_print_mutex);
}
static bool
@@ -444,11 +453,13 @@ CommandInterpreter::printError()
static int do_event_thread;
static void*
-event_thread_run(void* m)
+event_thread_run(void* p)
{
DBUG_ENTER("event_thread_run");
- NdbMgmHandle handle= *(NdbMgmHandle*)m;
+ struct event_thread_param param= *(struct event_thread_param*)p;
+ NdbMgmHandle handle= *(param.m);
+ NdbMutex* printmutex= *(param.p);
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
1, NDB_MGM_EVENT_CATEGORY_STARTUP,
@@ -466,7 +477,11 @@ event_thread_run(void* m)
{
const char ping_token[]= "<PING>";
if (memcmp(ping_token,tmp,sizeof(ping_token)-1))
- ndbout << tmp;
+ if(tmp && strlen(tmp))
+ {
+ Guard g(printmutex);
+ ndbout << tmp;
+ }
}
} while(do_event_thread);
NDB_CLOSE_SOCKET(fd);
@@ -519,8 +534,11 @@ CommandInterpreter::connect()
assert(m_event_thread == 0);
assert(do_event_thread == 0);
do_event_thread= 0;
+ struct event_thread_param p;
+ p.m= &m_mgmsrv2;
+ p.p= &m_print_mutex;
m_event_thread = NdbThread_Create(event_thread_run,
- (void**)&m_mgmsrv2,
+ (void**)&p,
32768,
"CommandInterpreted_event_thread",
NDB_THREAD_PRIO_LOW);
@@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect,
int result= execute_impl(_line);
if (error)
*error= m_error;
+
return result;
}
@@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line)
DBUG_RETURN(true);
if (strcasecmp(firstToken, "SHOW") == 0) {
+ Guard g(m_print_mutex);
executeShow(allAfterFirstToken);
DBUG_RETURN(true);
}
@@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun,
ndbout_c("Trying to start all nodes of system.");
ndbout_c("Use ALL STATUS to see the system start-up phases.");
} else {
+ Guard g(m_print_mutex);
struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv);
if(cl == 0){
ndbout_c("Unable get status from management server");
@@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters)
if(it == 0){
ndbout_c("Unable to create config iterator");
+ ndb_mgm_destroy_configuration(conf);
return;
}
NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it);
@@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters)
print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0);
print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0);
// ndbout << helpTextShow;
+ ndb_mgm_destroy_configuration(conf);
return;
} else if (strcasecmp(parameters, "PROPERTIES") == 0 ||
strcasecmp(parameters, "PROP") == 0) {
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 69c0286a1de..5fabb84adb7 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -77,7 +77,6 @@
}\
}
-extern int global_flag_send_heartbeat_now;
extern int g_no_nodeid_checks;
extern my_bool opt_core;
@@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort)
#include <ClusterMgr.hpp>
+void
+MgmtSrvr::updateStatus()
+{
+ theFacade->theClusterMgr->forceHB();
+}
+
int
MgmtSrvr::status(int nodeId,
ndb_mgm_node_status * _status,
@@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (found_matching_type && !found_free_node) {
// we have a temporary error which might be due to that
// we have got the latest connect status from db-nodes. Force update.
- global_flag_send_heartbeat_now= 1;
+ updateStatus();
}
BaseString type_string, type_c_string;
@@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
if (!m_reserved_nodes.isclear()) {
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
// node has been reserved, force update signal to ndb nodes
- global_flag_send_heartbeat_now= 1;
+ m_mgmsrv.updateStatus();
char tmp_str[128];
m_mgmsrv.m_reserved_nodes.getText(tmp_str);
diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp
index 187f225470a..17debb19f50 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.hpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.hpp
@@ -490,6 +490,8 @@ public:
void get_connected_nodes(NodeBitmask &connected_nodes) const;
SocketServer *get_socket_server() { return m_socket_server; }
+ void updateStatus();
+
//**************************************************************************
private:
//**************************************************************************
diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp
index 0524aba4c32..7f5b0e29442 100644
--- a/ndb/src/mgmsrv/Services.cpp
+++ b/ndb/src/mgmsrv/Services.cpp
@@ -982,6 +982,7 @@ printNodeStatus(OutputStream *output,
MgmtSrvr &mgmsrv,
enum ndb_mgm_node_type type) {
NodeId nodeId = 0;
+ mgmsrv.updateStatus();
while(mgmsrv.getNextNodeId(&nodeId, type)) {
enum ndb_mgm_node_status status;
Uint32 startPhase = 0,
diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp
index fbff57d3168..475561af225 100644
--- a/ndb/src/ndbapi/ClusterMgr.cpp
+++ b/ndb/src/ndbapi/ClusterMgr.cpp
@@ -37,7 +37,7 @@
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
-int global_flag_send_heartbeat_now= 0;
+//#define DEBUG_REG
// Just a C wrapper for threadMain
extern "C"
@@ -67,6 +67,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
DBUG_ENTER("ClusterMgr::ClusterMgr");
ndbSetOwnVersion();
clusterMgrThreadMutex = NdbMutex_Create();
+ waitForHBCond= NdbCondition_Create();
+ waitingForHB= false;
noOfAliveNodes= 0;
noOfConnectedNodes= 0;
theClusterMgrThread= 0;
@@ -77,7 +79,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
ClusterMgr::~ClusterMgr()
{
DBUG_ENTER("ClusterMgr::~ClusterMgr");
- doStop();
+ doStop();
+ NdbCondition_Destroy(waitForHBCond);
NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
}
@@ -164,6 +167,70 @@ ClusterMgr::doStop( ){
}
void
+ClusterMgr::forceHB()
+{
+ theFacade.lock_mutex();
+
+ if(waitingForHB)
+ {
+ NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
+ theFacade.unlock_mutex();
+ return;
+ }
+
+ waitingForHB= true;
+
+ NodeBitmask ndb_nodes;
+ ndb_nodes.clear();
+ waitForHBFromNodes.clear();
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ {
+ if(!theNodes[i].defined)
+ continue;
+ if(theNodes[i].m_info.m_type == NodeInfo::DB)
+ {
+ ndb_nodes.set(i);
+ const ClusterMgr::Node &node= getNodeInfo(i);
+ waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes);
+ }
+ }
+ waitForHBFromNodes.bitAND(ndb_nodes);
+
+#ifdef DEBUG_REG
+ char buf[128];
+ ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
+#endif
+ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
+
+ signal.theVerId_signalNumber = GSN_API_REGREQ;
+ signal.theReceiversBlockNumber = QMGR;
+ signal.theTrace = 0;
+ signal.theLength = ApiRegReq::SignalLength;
+
+ ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
+ req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
+ req->version = NDB_VERSION;
+
+ int nodeId= 0;
+ for(int i=0;
+ NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i));
+ i= nodeId+1)
+ {
+#ifdef DEBUG_REG
+ ndbout << "FORCE HB to " << nodeId << endl;
+#endif
+ theFacade.sendSignalUnCond(&signal, nodeId);
+ }
+
+ NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
+ waitingForHB= false;
+#ifdef DEBUG_REG
+ ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl;
+#endif
+ theFacade.unlock_mutex();
+}
+
+void
ClusterMgr::threadMain( ){
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
@@ -184,9 +251,6 @@ ClusterMgr::threadMain( ){
/**
* Start of Secure area for use of Transporter
*/
- int send_heartbeat_now= global_flag_send_heartbeat_now;
- global_flag_send_heartbeat_now= 0;
-
theFacade.lock_mutex();
for (int i = 1; i < MAX_NODES; i++){
/**
@@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){
}
theNode.hbCounter += timeSlept;
- if (theNode.hbCounter >= theNode.hbFrequency ||
- send_heartbeat_now) {
+ if (theNode.hbCounter >= theNode.hbFrequency) {
/**
* It is now time to send a new Heartbeat
*/
@@ -226,7 +289,7 @@ ClusterMgr::threadMain( ){
if (theNode.m_info.m_type == NodeInfo::REP) {
signal.theReceiversBlockNumber = API_CLUSTERMGR;
}
-#if 0
+#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
#endif
theFacade.sendSignalUnCond(&signal, nodeId);
@@ -278,7 +341,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
const NodeId nodeId = refToNode(apiRegReq->ref);
-#if 0
+#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
#endif
@@ -319,7 +382,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
-#if 0
+#ifdef DEBUG_REG
ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
#endif
@@ -351,6 +414,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
if (node.m_info.m_type != NodeInfo::REP) {
node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
}
+
+ if(waitingForHB)
+ {
+ waitForHBFromNodes.clear(nodeId);
+
+ if(waitForHBFromNodes.isclear())
+ {
+ waitingForHB= false;
+ NdbCondition_Broadcast(waitForHBCond);
+ }
+ }
}
void
@@ -379,6 +453,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){
default:
break;
}
+
+ waitForHBFromNodes.clear(nodeId);
+ if(waitForHBFromNodes.isclear())
+ NdbCondition_Signal(waitForHBCond);
}
void
diff --git a/ndb/src/ndbapi/ClusterMgr.hpp b/ndb/src/ndbapi/ClusterMgr.hpp
index 1a1e622a889..d2bcc52f7e8 100644
--- a/ndb/src/ndbapi/ClusterMgr.hpp
+++ b/ndb/src/ndbapi/ClusterMgr.hpp
@@ -49,7 +49,9 @@ public:
void doStop();
void startThread();
-
+
+ void forceHB();
+
private:
void threadMain();
@@ -85,7 +87,11 @@ private:
Uint32 noOfConnectedNodes;
Node theNodes[MAX_NODES];
NdbThread* theClusterMgrThread;
-
+
+ NodeBitmask waitForHBFromNodes; // used in forcing HBs
+ NdbCondition* waitForHBCond;
+ bool waitingForHB;
+
/**
* Used for controlling start/stop of the thread
*/