summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--acinclude.m47
-rw-r--r--configure.in8
-rw-r--r--mysql-test/mysql-test-run.sh11
-rw-r--r--mysql-test/ndb/Makefile.am2
-rw-r--r--mysql-test/ndb/ndbcluster.sh16
-rw-r--r--ndb/include/mgmapi/mgmapi.h1
-rw-r--r--ndb/include/util/Bitmask.hpp34
-rw-r--r--ndb/include/util/SocketServer.hpp4
-rw-r--r--ndb/src/common/mgmcommon/LocalConfig.cpp6
-rw-r--r--ndb/src/common/mgmcommon/Makefile.am2
-rw-r--r--ndb/src/common/mgmcommon/NdbConfig.c2
-rw-r--r--ndb/src/common/util/SocketServer.cpp15
-rw-r--r--ndb/src/mgmapi/mgmapi.cpp42
-rw-r--r--ndb/src/mgmclient/CommandInterpreter.cpp122
-rw-r--r--ndb/src/mgmclient/main.cpp46
-rw-r--r--ndb/src/mgmclient/ndb_mgmclient.h33
-rw-r--r--ndb/src/mgmclient/ndb_mgmclient.hpp8
-rw-r--r--ndb/src/mgmsrv/ConfigInfo.cpp8
-rw-r--r--ndb/src/mgmsrv/Makefile.am1
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.cpp88
-rw-r--r--ndb/src/mgmsrv/MgmtSrvr.hpp11
-rw-r--r--ndb/src/mgmsrv/Services.cpp42
-rw-r--r--ndb/src/mgmsrv/Services.hpp6
-rw-r--r--ndb/src/mgmsrv/main.cpp3
24 files changed, 422 insertions, 96 deletions
diff --git a/acinclude.m4 b/acinclude.m4
index 671e279a9f3..448bf7a2653 100644
--- a/acinclude.m4
+++ b/acinclude.m4
@@ -1614,9 +1614,14 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
--with-ndb-docs Include the NDB Cluster ndbapi and mgmapi documentation],
[ndb_docs="$withval"],
[ndb_docs=no])
+ AC_ARG_WITH([ndb-port],
+ [
+ --with-ndb-port Port for NDB Cluster management server],
+ [ndb_port="$withval"],
+ [ndb_port="default"])
AC_ARG_WITH([ndb-port-base],
[
- --with-ndb-port-base Base port for NDB Cluster],
+ --with-ndb-port-base Base port for NDB Cluster transporters],
[ndb_port_base="$withval"],
[ndb_port_base="default"])
diff --git a/configure.in b/configure.in
index cd2daf10690..764261c6933 100644
--- a/configure.in
+++ b/configure.in
@@ -3052,9 +3052,15 @@ AC_SUBST([NDB_DEFS])
AC_SUBST([ndb_cxxflags_fix])
+if test X"$ndb_port" = Xdefault
+then
+ ndb_port="1186"
+fi
+AC_SUBST([ndb_port])
+
if test X"$ndb_port_base" = Xdefault
then
- ndb_port_base="2200"
+ ndb_port_base="2202"
fi
AC_SUBST([ndb_port_base])
diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh
index 65be9bf9c8e..fcb9e5eee2d 100644
--- a/mysql-test/mysql-test-run.sh
+++ b/mysql-test/mysql-test-run.sh
@@ -457,6 +457,9 @@ SMALL_SERVER="--key_buffer_size=1M --sort_buffer=256K --max_heap_table_size=1M"
export MASTER_MYPORT MASTER_MYPORT1 SLAVE_MYPORT MYSQL_TCP_PORT MASTER_MYSOCK MASTER_MYSOCK1
+NDBCLUSTER_BASE_PORT=`expr $NDBCLUSTER_PORT + 2`
+NDBCLUSTER_OPTS="--port=$NDBCLUSTER_PORT --port-base=$NDBCLUSTER_BASE_PORT --data-dir=$MYSQL_TEST_DIR/var"
+
if [ x$SOURCE_DIST = x1 ] ; then
MY_BASEDIR=$MYSQL_TEST_DIR
else
@@ -939,11 +942,11 @@ start_ndbcluster()
echo "Starting ndbcluster"
if [ "$DO_BENCH" = 1 ]
then
- NDBCLUSTER_OPTS=""
+ NDBCLUSTER_EXTRA_OPTS=""
else
- NDBCLUSTER_OPTS="--small"
+ NDBCLUSTER_EXTRA_OPTS="--small"
fi
- ./ndb/ndbcluster --port-base=$NDBCLUSTER_PORT $NDBCLUSTER_OPTS --diskless --initial --data-dir=$MYSQL_TEST_DIR/var || exit 1
+ ./ndb/ndbcluster $NDBCLUSTER_OPTS $NDBCLUSTER_EXTRA_OPTS --diskless --initial || exit 1
NDB_CONNECTSTRING="host=localhost:$NDBCLUSTER_PORT"
else
NDB_CONNECTSTRING="$USE_RUNNING_NDBCLUSTER"
@@ -961,7 +964,7 @@ stop_ndbcluster()
if [ -z "$USE_RUNNING_NDBCLUSTER" ]
then
# Kill any running ndbcluster stuff
- ./ndb/ndbcluster --data-dir=$MYSQL_TEST_DIR/var --port-base=$NDBCLUSTER_PORT --stop
+ ./ndb/ndbcluster $NDBCLUSTER_OPTS --stop
fi
fi
}
diff --git a/mysql-test/ndb/Makefile.am b/mysql-test/ndb/Makefile.am
index 3ed222344a6..502ccee099e 100644
--- a/mysql-test/ndb/Makefile.am
+++ b/mysql-test/ndb/Makefile.am
@@ -13,6 +13,8 @@ SUFFIXES = .sh
.sh:
@RM@ -f $@ $@-t
@SED@ \
+ -e 's!@''ndb_port''@!$(ndb_port)!g' \
+ -e 's!@''ndb_port_base''@!$(ndb_port_base)!g' \
-e 's!@''ndbbindir''@!$(ndbbindir)!g' \
-e 's!@''ndbtoolsdir''@!$(ndbtoolsdir)!g' \
$< > $@-t
diff --git a/mysql-test/ndb/ndbcluster.sh b/mysql-test/ndb/ndbcluster.sh
index f5757a11056..3486a879eec 100644
--- a/mysql-test/ndb/ndbcluster.sh
+++ b/mysql-test/ndb/ndbcluster.sh
@@ -5,7 +5,8 @@
# This scripts starts the table handler ndbcluster
# configurable parameters, make sure to change in mysqlcluterd as well
-port_base="2200"
+port=@ndb_port@
+port_base=@ndb_port_base@
fsdir=`pwd`
# end configurable parameters
@@ -84,6 +85,9 @@ while test $# -gt 0; do
--data-dir=*)
fsdir=`echo "$1" | sed -e "s;--data-dir=;;"`
;;
+ --port=*)
+ port=`echo "$1" | sed -e "s;--port=;;"`
+ ;;
--port-base=*)
port_base=`echo "$1" | sed -e "s;--port-base=;;"`
;;
@@ -94,7 +98,7 @@ while test $# -gt 0; do
shift
done
-fs_ndb="$fsdir/ndbcluster-$port_base"
+fs_ndb="$fsdir/ndbcluster-$port"
NDB_HOME=
if [ ! -x "$fsdir" ]; then
@@ -120,7 +124,7 @@ exec_ndb="$exec_ndb --no-defaults"
exec_waiter="$exec_waiter --no-defaults"
ndb_host="localhost"
-ndb_mgmd_port=$port_base
+ndb_mgmd_port=$port
NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port"
export NDB_CONNECTSTRING
@@ -158,10 +162,6 @@ if [ -d "$fs_ndb" ]; then :; else
exit 1
fi
-# set som help variables
-
-port_transporter=`expr $ndb_mgmd_port + 2`
-
# Start management server as deamon
# Edit file system path and ports in config file
@@ -176,7 +176,7 @@ sed \
-e s,"CHOOSE_HOSTNAME_".*,"$ndb_host",g \
-e s,"CHOOSE_FILESYSTEM","$fs_ndb",g \
-e s,"CHOOSE_PORT_MGM","$ndb_mgmd_port",g \
- -e s,"CHOOSE_PORT_TRANSPORTER","$port_transporter",g \
+ -e s,"CHOOSE_PORT_TRANSPORTER","$port_base",g \
< ndb/ndb_config_2_node.ini \
> "$fs_ndb/config.ini"
fi
diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h
index 6dcf58b44e2..f1ef357421b 100644
--- a/ndb/include/mgmapi/mgmapi.h
+++ b/ndb/include/mgmapi/mgmapi.h
@@ -733,6 +733,7 @@ extern "C" {
int param, unsigned long long * value);
int ndb_mgm_get_string_parameter(const ndb_mgm_configuration_iterator*,
int param, const char ** value);
+ int ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **);
#ifdef __cplusplus
}
#endif
diff --git a/ndb/include/util/Bitmask.hpp b/ndb/include/util/Bitmask.hpp
index bb217adab5f..19aa604e4a1 100644
--- a/ndb/include/util/Bitmask.hpp
+++ b/ndb/include/util/Bitmask.hpp
@@ -105,6 +105,11 @@ public:
static void bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]);
/**
+ * bitXORC - Bitwise (x ^ ~y) into first operand.
+ */
+ static void bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]);
+
+ /**
* contains - Check if all bits set in data2 are set in data
*/
static bool contains(unsigned size, Uint32 data[], const Uint32 data2[]);
@@ -261,6 +266,14 @@ BitmaskImpl::bitXOR(unsigned size, Uint32 data[], const Uint32 data2[])
}
}
+inline void
+BitmaskImpl::bitXORC(unsigned size, Uint32 data[], const Uint32 data2[])
+{
+ for (unsigned i = 0; i < size; i++) {
+ data[i] ^= ~data2[i];
+ }
+}
+
inline bool
BitmaskImpl::contains(unsigned size, Uint32 data[], const Uint32 data2[])
{
@@ -452,6 +465,12 @@ public:
BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2);
/**
+ * bitXORC - Bitwise (x ^ ~y) into first operand.
+ */
+ static void bitXORC(Uint32 data[], const Uint32 data2[]);
+ BitmaskPOD<size>& bitXORC(const BitmaskPOD<size>& mask2);
+
+ /**
* contains - Check if all bits set in data2 (that) are also set in data (this)
*/
static bool contains(Uint32 data[], const Uint32 data2[]);
@@ -713,6 +732,21 @@ BitmaskPOD<size>::bitXOR(const BitmaskPOD<size>& mask2)
}
template <unsigned size>
+inline void
+BitmaskPOD<size>::bitXORC(Uint32 data[], const Uint32 data2[])
+{
+ BitmaskImpl::bitXORC(size,data, data2);
+}
+
+template <unsigned size>
+inline BitmaskPOD<size>&
+BitmaskPOD<size>::bitXORC(const BitmaskPOD<size>& mask2)
+{
+ BitmaskPOD<size>::bitXORC(rep.data, mask2.rep.data);
+ return *this;
+}
+
+template <unsigned size>
char *
BitmaskPOD<size>::getText(const Uint32 data[], char* buf)
{
diff --git a/ndb/include/util/SocketServer.hpp b/ndb/include/util/SocketServer.hpp
index 3860b9ca84b..2fad991e5f8 100644
--- a/ndb/include/util/SocketServer.hpp
+++ b/ndb/include/util/SocketServer.hpp
@@ -37,7 +37,7 @@ public:
public:
virtual ~Session() {}
virtual void runSession(){}
- virtual void stopSession(){}
+ virtual void stopSession(){ m_stop = true; }
protected:
friend class SocketServer;
friend void* sessionThread_C(void*);
@@ -98,6 +98,8 @@ public:
*/
void stopSessions(bool wait = false);
+ void foreachSession(void (*f)(SocketServer::Session*, void*), void *data);
+
private:
struct SessionInstance {
Service * m_service;
diff --git a/ndb/src/common/mgmcommon/LocalConfig.cpp b/ndb/src/common/mgmcommon/LocalConfig.cpp
index 3cd4341c6b7..679de716be0 100644
--- a/ndb/src/common/mgmcommon/LocalConfig.cpp
+++ b/ndb/src/common/mgmcommon/LocalConfig.cpp
@@ -90,7 +90,7 @@ LocalConfig::init(const char *connectString,
//7. Check
{
char buf[256];
- BaseString::snprintf(buf, sizeof(buf), "host=localhost:%s", NDB_BASE_PORT);
+ BaseString::snprintf(buf, sizeof(buf), "host=localhost:%s", NDB_PORT);
if(readConnectString(buf, "default connect string"))
return true;
}
@@ -124,12 +124,12 @@ void LocalConfig::printUsage() const {
ndbout << "1. Put a Ndb.cfg file in the directory where you start"<<endl
<< " the node. "<< endl
<< " Ex: Ndb.cfg" << endl
- << " | host=localhost:"<<NDB_BASE_PORT<<endl;
+ << " | host=localhost:"<<NDB_PORT<<endl;
ndbout << "2. Use the environment variable NDB_CONNECTSTRING to "<<endl
<< " provide this information." <<endl
<< " Ex: " << endl
- << " >export NDB_CONNECTSTRING=\"host=localhost:"<<NDB_BASE_PORT<<"\""
+ << " >export NDB_CONNECTSTRING=\"host=localhost:"<<NDB_PORT<<"\""
<<endl<<endl;
}
diff --git a/ndb/src/common/mgmcommon/Makefile.am b/ndb/src/common/mgmcommon/Makefile.am
index ed6a526eb47..b787da51ab9 100644
--- a/ndb/src/common/mgmcommon/Makefile.am
+++ b/ndb/src/common/mgmcommon/Makefile.am
@@ -7,7 +7,7 @@ libmgmsrvcommon_la_SOURCES = \
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi -I$(top_srcdir)/ndb/src/mgmsrv
-DEFS_LOC = -DNDB_BASE_PORT="\"@ndb_port_base@\""
+DEFS_LOC = -DNDB_PORT="\"@ndb_port@\""
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_ndbapi.mk.am
diff --git a/ndb/src/common/mgmcommon/NdbConfig.c b/ndb/src/common/mgmcommon/NdbConfig.c
index e92f8fa8392..8adc4c20dff 100644
--- a/ndb/src/common/mgmcommon/NdbConfig.c
+++ b/ndb/src/common/mgmcommon/NdbConfig.c
@@ -74,7 +74,7 @@ NdbConfig_NdbCfgName(int with_ndb_home){
static
char *get_prefix_buf(int len, int node_id)
{
- char tmp_buf[sizeof("ndb_pid#########")+1];
+ char tmp_buf[sizeof("ndb_pid#############")+1];
char *buf;
if (node_id > 0)
snprintf(tmp_buf, sizeof(tmp_buf), "ndb_%u", node_id);
diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp
index c3cffa1399b..8bee256684d 100644
--- a/ndb/src/common/util/SocketServer.cpp
+++ b/ndb/src/common/util/SocketServer.cpp
@@ -259,6 +259,15 @@ transfer(NDB_SOCKET_TYPE sock){
}
void
+SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
+{
+ for(int i = m_sessions.size() - 1; i >= 0; i--){
+ (*func)(m_sessions[i].m_session, data);
+ }
+ checkSessions();
+}
+
+void
SocketServer::checkSessions(){
for(int i = m_sessions.size() - 1; i >= 0; i--){
if(m_sessions[i].m_session->m_stopped){
@@ -278,8 +287,10 @@ void
SocketServer::stopSessions(bool wait){
int i;
for(i = m_sessions.size() - 1; i>=0; i--)
- m_sessions[i].m_session->m_stop = true;
-
+ {
+ m_sessions[i].m_session->stopSession();
+ m_sessions[i].m_session->m_stop = true; // to make sure
+ }
for(i = m_services.size() - 1; i>=0; i--)
m_services[i].m_service->stopSessions();
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index 4b62df968b3..66f0dbb1842 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -1834,4 +1834,46 @@ ndb_mgm_set_string_parameter(NdbMgmHandle handle,
return res;
}
+extern "C"
+int
+ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **purged){
+ CHECK_HANDLE(handle, 0);
+ CHECK_CONNECTED(handle, 0);
+
+ Properties args;
+
+ const ParserRow<ParserDummy> reply[]= {
+ MGM_CMD("purge stale sessions reply", NULL, ""),
+ MGM_ARG("purged", String, Optional, ""),
+ MGM_ARG("result", String, Mandatory, "Error message"),
+ MGM_END()
+ };
+
+ const Properties *prop;
+ prop= ndb_mgm_call(handle, reply, "purge stale sessions", &args);
+
+ if(prop == NULL) {
+ SET_ERROR(handle, EIO, "Unable to purge stale sessions");
+ return -1;
+ }
+
+ int res= -1;
+ do {
+ const char * buf;
+ if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
+ ndbout_c("ERROR Message: %s\n", buf);
+ break;
+ }
+ if (purged) {
+ if (prop->get("purged", &buf))
+ *purged= strdup(buf);
+ else
+ *purged= 0;
+ }
+ res= 0;
+ } while(0);
+ delete prop;
+ return res;
+}
+
template class Vector<const ParserRow<ParserDummy>*>;
diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp
index e802ffff5ce..f1a953769b8 100644
--- a/ndb/src/mgmclient/CommandInterpreter.cpp
+++ b/ndb/src/mgmclient/CommandInterpreter.cpp
@@ -17,17 +17,6 @@
#include <ndb_global.h>
#include <my_sys.h>
-// copied from mysql.cc to get readline
-extern "C" {
-#if defined( __WIN__) || defined(OS2)
-#include <conio.h>
-#elif !defined(__NETWARE__)
-#include <readline/readline.h>
-extern "C" int add_history(const char *command); /* From readline directory */
-#define HAVE_READLINE
-#endif
-}
-
//#define HAVE_GLOBAL_REPLICATION
#include <Vector.hpp>
@@ -65,7 +54,6 @@ public:
*
* @return true until quit/bye/exit has been typed
*/
- int readAndExecute(int _try_reconnect=-1);
int execute(const char *_line, int _try_reconnect=-1);
private:
@@ -106,6 +94,7 @@ private:
*/
void executeHelp(char* parameters);
void executeShow(char* parameters);
+ void executePurge(char* parameters);
void executeShutdown(char* parameters);
void executeRun(char* parameters);
void executeInfo(char* parameters);
@@ -179,6 +168,7 @@ private:
*/
#include "ndb_mgmclient.hpp"
+#include "ndb_mgmclient.h"
Ndb_mgmclient::Ndb_mgmclient(const char *host)
{
@@ -188,10 +178,6 @@ Ndb_mgmclient::~Ndb_mgmclient()
{
delete m_cmd;
}
-int Ndb_mgmclient::read_and_execute(int _try_reconnect)
-{
- return m_cmd->readAndExecute(_try_reconnect);
-}
int Ndb_mgmclient::execute(const char *_line, int _try_reconnect)
{
return m_cmd->execute(_line,_try_reconnect);
@@ -202,7 +188,20 @@ Ndb_mgmclient::disconnect()
return m_cmd->disconnect();
}
-
+extern "C" {
+ Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string)
+ {
+ return (Ndb_mgmclient_handle) new Ndb_mgmclient(connect_string);
+ }
+ int ndb_mgmclient_execute(Ndb_mgmclient_handle h, int argc, const char** argv)
+ {
+ return ((Ndb_mgmclient*)h)->execute(argc, argv, 1);
+ }
+ int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle h)
+ {
+ delete (Ndb_mgmclient*)h;
+ }
+}
/*
* The CommandInterpreter
*/
@@ -226,6 +225,17 @@ Ndb_mgmclient::disconnect()
#include <util/InputStream.hpp>
#include <util/OutputStream.hpp>
+int Ndb_mgmclient::execute(int argc, const char** argv, int _try_reconnect)
+{
+ if (argc <= 0)
+ return 0;
+ BaseString _line(argv[0]);
+ for (int i= 1; i < argc; i++)
+ {
+ _line.appfmt(" %s", argv[i]);
+ }
+ return m_cmd->execute(_line.c_str(),_try_reconnect);
+}
/*****************************************************************************
* HELP
@@ -264,6 +274,7 @@ static const char* helpText =
#ifdef HAVE_GLOBAL_REPLICATION
"REP CONNECT <host:port> Connect to REP server on host:port\n"
#endif
+"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n"
"QUIT Quit management client\n"
;
@@ -455,39 +466,6 @@ CommandInterpreter::disconnect()
//*****************************************************************************
int
-CommandInterpreter::readAndExecute(int _try_reconnect)
-{
- static char *line_read = (char *)NULL;
-
- /* If the buffer has already been allocated, return the memory
- to the free pool. */
- if (line_read)
- {
- free (line_read);
- line_read = (char *)NULL;
- }
-#ifdef HAVE_READLINE
- /* Get a line from the user. */
- line_read = readline ("ndb_mgm> ");
- /* If the line has any text in it, save it on the history. */
- if (line_read && *line_read)
- add_history (line_read);
-#else
- static char linebuffer[254];
- fputs("ndb_mgm> ", stdout);
- linebuffer[sizeof(linebuffer)-1]=0;
- line_read = fgets(linebuffer, sizeof(linebuffer)-1, stdin);
- if (line_read == linebuffer) {
- char *q=linebuffer;
- while (*q > 31) q++;
- *q=0;
- line_read= strdup(linebuffer);
- }
-#endif
- return execute(line_read,_try_reconnect);
-}
-
-int
CommandInterpreter::execute(const char *_line, int _try_reconnect)
{
if (_try_reconnect >= 0)
@@ -541,6 +519,10 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
executeAbortBackup(allAfterFirstToken);
return true;
}
+ else if (strcmp(firstToken, "PURGE") == 0) {
+ executePurge(allAfterFirstToken);
+ return true;
+ }
#ifdef HAVE_GLOBAL_REPLICATION
else if(strcmp(firstToken, "REPLICATION") == 0 ||
strcmp(firstToken, "REP") == 0) {
@@ -983,6 +965,46 @@ print_nodes(ndb_mgm_cluster_state *state, ndb_mgm_configuration_iterator *it,
}
void
+CommandInterpreter::executePurge(char* parameters)
+{
+ int command_ok= 0;
+ do {
+ if (emptyString(parameters))
+ break;
+ char* firstToken = strtok(parameters, " ");
+ char* nextToken = strtok(NULL, " \0");
+ if (strcmp(firstToken,"STALE") == 0 &&
+ nextToken &&
+ strcmp(nextToken, "SESSIONS") == 0) {
+ command_ok= 1;
+ break;
+ }
+ } while(0);
+
+ if (!command_ok) {
+ ndbout_c("Unexpected command, expected: PURGE STALE SESSIONS");
+ return;
+ }
+
+ int i;
+ char *str;
+ connect();
+
+ if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
+ ndbout_c("Command failed");
+ return;
+ }
+ if (str) {
+ ndbout_c("Purged sessions with node id's: %s", str);
+ free(str);
+ }
+ else
+ {
+ ndbout_c("No sessions purged");
+ }
+}
+
+void
CommandInterpreter::executeShow(char* parameters)
{
int i;
diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp
index 3415ede1985..8f5d9e6656c 100644
--- a/ndb/src/mgmclient/main.cpp
+++ b/ndb/src/mgmclient/main.cpp
@@ -17,6 +17,17 @@
#include <ndb_global.h>
#include <ndb_opts.h>
+// copied from mysql.cc to get readline
+extern "C" {
+#if defined( __WIN__) || defined(OS2)
+#include <conio.h>
+#elif !defined(__NETWARE__)
+#include <readline/readline.h>
+extern "C" int add_history(const char *command); /* From readline directory */
+#define HAVE_READLINE
+#endif
+}
+
#include <NdbMain.h>
#include <NdbHost.h>
#include <mgmapi.h>
@@ -90,6 +101,39 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
return 0;
}
+static int
+read_and_execute(int _try_reconnect)
+{
+ static char *line_read = (char *)NULL;
+
+ /* If the buffer has already been allocated, return the memory
+ to the free pool. */
+ if (line_read)
+ {
+ free (line_read);
+ line_read = (char *)NULL;
+ }
+#ifdef HAVE_READLINE
+ /* Get a line from the user. */
+ line_read = readline ("ndb_mgm> ");
+ /* If the line has any text in it, save it on the history. */
+ if (line_read && *line_read)
+ add_history (line_read);
+#else
+ static char linebuffer[254];
+ fputs("ndb_mgm> ", stdout);
+ linebuffer[sizeof(linebuffer)-1]=0;
+ line_read = fgets(linebuffer, sizeof(linebuffer)-1, stdin);
+ if (line_read == linebuffer) {
+ char *q=linebuffer;
+ while (*q > 31) q++;
+ *q=0;
+ line_read= strdup(linebuffer);
+ }
+#endif
+ return com->execute(line_read,_try_reconnect);
+}
+
int main(int argc, char** argv){
NDB_INIT(argv[0]);
const char *_host = 0;
@@ -128,7 +172,7 @@ int main(int argc, char** argv){
signal(SIGPIPE, handler);
com = new Ndb_mgmclient(buf);
- while(com->read_and_execute(_try_reconnect));
+ while(read_and_execute(_try_reconnect));
delete com;
return 0;
diff --git a/ndb/src/mgmclient/ndb_mgmclient.h b/ndb/src/mgmclient/ndb_mgmclient.h
new file mode 100644
index 00000000000..265e6bc67ec
--- /dev/null
+++ b/ndb/src/mgmclient/ndb_mgmclient.h
@@ -0,0 +1,33 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef Ndb_mgmclient_h
+#define Ndb_mgmclient_h
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void* Ndb_mgmclient_handle;
+Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string);
+int ndb_mgmclient_execute(Ndb_mgmclient_handle, int argc, const char** argv);
+int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* Ndb_mgmclient_h */
diff --git a/ndb/src/mgmclient/ndb_mgmclient.hpp b/ndb/src/mgmclient/ndb_mgmclient.hpp
index 2f021a0f2b6..933d1bab5ce 100644
--- a/ndb/src/mgmclient/ndb_mgmclient.hpp
+++ b/ndb/src/mgmclient/ndb_mgmclient.hpp
@@ -14,8 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-#ifndef Ndb_mgmclient_h
-#define Ndb_mgmclient_h
+#ifndef Ndb_mgmclient_hpp
+#define Ndb_mgmclient_hpp
class CommandInterpreter;
class Ndb_mgmclient
@@ -23,11 +23,11 @@ class Ndb_mgmclient
public:
Ndb_mgmclient(const char*);
~Ndb_mgmclient();
- int read_and_execute(int _try_reconnect=-1);
int execute(const char *_line, int _try_reconnect=-1);
+ int execute(int argc, const char** argv, int _try_reconnect=-1);
int disconnect();
private:
CommandInterpreter *m_cmd;
};
-#endif // Ndb_mgmclient_h
+#endif // Ndb_mgmclient_hpp
diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp
index ad346b30ead..4af1556a0c0 100644
--- a/ndb/src/mgmsrv/ConfigInfo.cpp
+++ b/ndb/src/mgmsrv/ConfigInfo.cpp
@@ -1478,7 +1478,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
- NDB_BASE_PORT,
+ NDB_PORT,
"0",
STR_VALUE(MAX_INT_RNIL) },
@@ -1490,7 +1490,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
- "2199",
+ UNDEFINED,
"0",
STR_VALUE(MAX_INT_RNIL) },
@@ -3010,7 +3010,7 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
if(!(ctx.m_userDefaults &&
ctx.m_userDefaults->get("PortNumber", &base)) &&
!ctx.m_systemDefaults->get("PortNumber", &base)) {
- base= strtoll(NDB_BASE_PORT,0,0)+2;
+ base= strtoll(NDB_BASE_PORT,0,0);
// ctx.reportError("Cannot retrieve base port number");
// return false;
}
@@ -3442,7 +3442,7 @@ static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
#if 0
Properties * props= ctx.m_config;
Properties computers(true);
- Uint32 port_base = NDB_BASE_PORT+2;
+ Uint32 port_base = NDB_BASE_PORT;
Uint32 nNodes;
ctx.m_userProperties.get("NoOfNodes", &nNodes);
diff --git a/ndb/src/mgmsrv/Makefile.am b/ndb/src/mgmsrv/Makefile.am
index 4cb164d48fe..ee5220ef9f8 100644
--- a/ndb/src/mgmsrv/Makefile.am
+++ b/ndb/src/mgmsrv/Makefile.am
@@ -33,6 +33,7 @@ DEFS_LOC = -DDEFAULT_MYSQL_HOME="\"$(MYSQLBASEdir)\"" \
-DDATADIR="\"$(MYSQLDATAdir)\"" \
-DSHAREDIR="\"$(MYSQLSHAREdir)\"" \
-DMYSQLCLUSTERDIR="\"$(MYSQLCLUSTERdir)\"" \
+ -DNDB_PORT="\"@ndb_port@\"" \
-DNDB_BASE_PORT="\"@ndb_port_base@\""
include $(top_srcdir)/ndb/config/common.mk.am
diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp
index 2e30d73290b..a49b29af275 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.cpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.cpp
@@ -400,11 +400,13 @@ MgmtSrvr::getPort() const {
/* Constructor */
MgmtSrvr::MgmtSrvr(NodeId nodeId,
+ SocketServer *socket_server,
const BaseString &configFilename,
LocalConfig &local_config,
Config * config):
_blockNumber(1), // Hard coded block number since it makes it easy to send
// signals to other management servers.
+ m_socket_server(socket_server),
_ownReference(0),
m_local_config(local_config),
theSignalIdleList(NULL),
@@ -2094,6 +2096,25 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId];
}
+void
+MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
+{
+ if (theFacade && theFacade->theClusterMgr)
+ {
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ {
+ if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB)
+ {
+ const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
+ if (node.connected)
+ {
+ connected_nodes.bitOR(node.m_state.m_connected_nodes);
+ }
+ }
+ }
+ }
+}
+
bool
MgmtSrvr::alloc_node_id(NodeId * nodeId,
enum ndb_mgm_node_type type,
@@ -2106,7 +2127,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
*nodeId, type, client_addr));
if (g_no_nodeid_checks) {
if (*nodeId == 0) {
- error_string.appfmt("no-nodeid-ckecks set in manegment server.\n"
+ error_string.appfmt("no-nodeid-checks set in management server.\n"
"node id must be set explicitly in connectstring");
DBUG_RETURN(false);
}
@@ -2115,16 +2136,11 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
Guard g(m_node_id_mutex);
int no_mgm= 0;
NodeBitmask connected_nodes(m_reserved_nodes);
- for(Uint32 i = 0; i < MAX_NODES; i++)
+ get_connected_nodes(connected_nodes);
{
- if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB &&
- theFacade && theFacade->theClusterMgr) {
- const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
- if (node.connected) {
- connected_nodes.bitOR(node.m_state.m_connected_nodes);
- }
- } else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
- no_mgm++;
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
+ no_mgm++;
}
bool found_matching_id= false;
bool found_matching_type= false;
@@ -2227,6 +2243,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
m_connect_address[id_found].s_addr= 0;
}
m_reserved_nodes.set(id_found);
+ char tmp_str[128];
+ m_reserved_nodes.getText(tmp_str);
+ g_EventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.",
+ id_found, get_connect_address(id_found), tmp_str);
DBUG_RETURN(true);
}
@@ -2283,6 +2303,36 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
error_string.appfmt("No node defined with id=%d in config file.",
*nodeId);
}
+
+ g_EventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. "
+ "Returned error string \"%s\"",
+ *nodeId,
+ client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>",
+ error_string.c_str());
+
+ NodeBitmask connected_nodes2;
+ get_connected_nodes(connected_nodes2);
+ {
+ BaseString tmp_connected, tmp_not_connected;
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ {
+ if (connected_nodes2.get(i))
+ {
+ if (!m_reserved_nodes.get(i))
+ tmp_connected.appfmt(" %d", i);
+ }
+ else if (m_reserved_nodes.get(i))
+ {
+ tmp_not_connected.appfmt(" %d", i);
+ }
+ }
+ if (tmp_connected.length() > 0)
+ g_EventLogger.info("Mgmt server state: node id's %s connected but not reserved",
+ tmp_connected.c_str());
+ if (tmp_not_connected.length() > 0)
+ g_EventLogger.info("Mgmt server state: node id's %s not connected but reserved",
+ tmp_not_connected.c_str());
+ }
DBUG_RETURN(false);
}
@@ -2531,10 +2581,15 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
{
Guard g(m_mgmsrv.m_node_id_mutex);
if (!m_reserved_nodes.isclear()) {
+ m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
// node has been reserved, force update signal to ndb nodes
global_flag_send_heartbeat_now= 1;
+
+ char tmp_str[128];
+ m_mgmsrv.m_reserved_nodes.getText(tmp_str);
+ g_EventLogger.info("Mgmt server state: nodeid %d freed, m_reserved_nodes %s.",
+ get_nodeid(), tmp_str);
}
- m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
}
void
@@ -2543,6 +2598,17 @@ MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
m_reserved_nodes.set(id);
}
+NodeId
+MgmtSrvr::Allocated_resources::get_nodeid() const
+{
+ for(Uint32 i = 0; i < MAX_NODES; i++)
+ {
+ if (m_reserved_nodes.get(i))
+ return i;
+ }
+ return 0;
+}
+
int
MgmtSrvr::setDbParameter(int node, int param, const char * value,
BaseString& msg){
diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp
index c796e1e9219..b3257491123 100644
--- a/ndb/src/mgmsrv/MgmtSrvr.hpp
+++ b/ndb/src/mgmsrv/MgmtSrvr.hpp
@@ -96,7 +96,10 @@ public:
// methods to reserve/allocate resources which
// will be freed when running destructor
void reserve_node(NodeId id);
- bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId);}
+ bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
+ bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); }
+ bool isclear() { return m_reserved_nodes.isclear(); }
+ NodeId get_nodeid() const;
private:
MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes;
@@ -173,6 +176,7 @@ public:
/* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */
+ SocketServer *socket_server,
const BaseString &config_filename, /* Where to save config */
LocalConfig &local_config, /* Ndb.cfg filename */
Config * config);
@@ -499,6 +503,9 @@ public:
int setDbParameter(int node, int parameter, const char * value, BaseString&);
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
+ void get_connected_nodes(NodeBitmask &connected_nodes) const;
+ SocketServer *get_socket_server() { return m_socket_server; }
+
//**************************************************************************
private:
//**************************************************************************
@@ -525,6 +532,8 @@ private:
int _blockNumber;
NodeId _ownNodeId;
+ SocketServer *m_socket_server;
+
BlockReference _ownReference;
NdbMutex *m_configMutex;
const Config * _config;
diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp
index 2672d8c9d4b..0394c4e80bb 100644
--- a/ndb/src/mgmsrv/Services.cpp
+++ b/ndb/src/mgmsrv/Services.cpp
@@ -242,6 +242,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("node", Int, Optional, "Node"),
MGM_ARG("filter", String, Mandatory, "Event category"),
+ MGM_CMD("purge stale sessions", &MgmApiSession::purge_stale_sessions, ""),
+
MGM_END()
};
@@ -1412,6 +1414,46 @@ done:
m_output->println("");
}
+struct PurgeStruct
+{
+ NodeBitmask free_nodes;/* free nodes as reported
+ * by ndbd in apiRegReqConf
+ */
+ BaseString *str;
+};
+
+void
+MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
+{
+ MgmApiSession *s= (MgmApiSession *)_s;
+ struct PurgeStruct &ps= *(struct PurgeStruct *)data;
+ if (s->m_allocated_resources->is_reserved(ps.free_nodes))
+ {
+ ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
+ s->stopSession();
+ }
+}
+
+void
+MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
+ const class Properties &args)
+{
+ struct PurgeStruct ps;
+ BaseString str;
+ ps.str = &str;
+
+ m_mgmsrv.get_connected_nodes(ps.free_nodes);
+ ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
+
+ m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
+
+ m_output->println("purge stale sessions reply");
+ if (str.length() > 0)
+ m_output->println("purged:%s",str.c_str());
+ m_output->println("result: Ok");
+ m_output->println("");
+}
+
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
template class Vector<unsigned short>;
diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp
index e47820826b6..bfc915f18f1 100644
--- a/ndb/src/mgmsrv/Services.hpp
+++ b/ndb/src/mgmsrv/Services.hpp
@@ -28,7 +28,9 @@
/** Undefine this to remove backwards compatibility for "GET CONFIG". */
#define MGM_GET_CONFIG_BACKWARDS_COMPAT
-class MgmApiSession : public SocketServer::Session {
+class MgmApiSession : public SocketServer::Session
+{
+ static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
private:
typedef Parser<MgmApiSession> Parser_t;
@@ -84,6 +86,8 @@ public:
void setParameter(Parser_t::Context &ctx, const class Properties &args);
void listen_event(Parser_t::Context &ctx, const class Properties &args);
+
+ void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args);
void repCommand(Parser_t::Context &ctx, const class Properties &args);
};
diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp
index 15767e4766d..7a57fdeb77a 100644
--- a/ndb/src/mgmsrv/main.cpp
+++ b/ndb/src/mgmsrv/main.cpp
@@ -83,7 +83,6 @@ struct MgmGlobals {
int g_no_nodeid_checks= 0;
static MgmGlobals glob;
-
/******************************************************************************
* Function prototypes
******************************************************************************/
@@ -226,7 +225,7 @@ int main(int argc, char** argv)
if (!readGlobalConfig())
goto error_end;
- glob.mgmObject = new MgmtSrvr(glob.localNodeId,
+ glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer,
BaseString(glob.config_filename),
local_config,
glob.cluster_config);