diff options
-rw-r--r-- | acinclude.m4 | 7 | ||||
-rw-r--r-- | configure.in | 8 | ||||
-rw-r--r-- | mysql-test/mysql-test-run.sh | 11 | ||||
-rw-r--r-- | mysql-test/ndb/Makefile.am | 2 | ||||
-rw-r--r-- | mysql-test/ndb/ndbcluster.sh | 16 | ||||
-rw-r--r-- | ndb/include/mgmapi/mgmapi.h | 1 | ||||
-rw-r--r-- | ndb/include/util/Bitmask.hpp | 34 | ||||
-rw-r--r-- | ndb/include/util/SocketServer.hpp | 4 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/LocalConfig.cpp | 6 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/Makefile.am | 2 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/NdbConfig.c | 2 | ||||
-rw-r--r-- | ndb/src/common/util/SocketServer.cpp | 15 | ||||
-rw-r--r-- | ndb/src/mgmapi/mgmapi.cpp | 42 | ||||
-rw-r--r-- | ndb/src/mgmclient/CommandInterpreter.cpp | 122 | ||||
-rw-r--r-- | ndb/src/mgmclient/main.cpp | 46 | ||||
-rw-r--r-- | ndb/src/mgmclient/ndb_mgmclient.h | 33 | ||||
-rw-r--r-- | ndb/src/mgmclient/ndb_mgmclient.hpp | 8 | ||||
-rw-r--r-- | ndb/src/mgmsrv/ConfigInfo.cpp | 8 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Makefile.am | 1 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.cpp | 88 | ||||
-rw-r--r-- | ndb/src/mgmsrv/MgmtSrvr.hpp | 11 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.cpp | 42 | ||||
-rw-r--r-- | ndb/src/mgmsrv/Services.hpp | 6 | ||||
-rw-r--r-- | ndb/src/mgmsrv/main.cpp | 3 |
24 files changed, 422 insertions, 96 deletions
diff --git a/acinclude.m4 b/acinclude.m4 index 671e279a9f3..448bf7a2653 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -1614,9 +1614,14 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [ --with-ndb-docs Include the NDB Cluster ndbapi and mgmapi documentation], [ndb_docs="$withval"], [ndb_docs=no]) + AC_ARG_WITH([ndb-port], + [ + --with-ndb-port Port for NDB Cluster management server], + [ndb_port="$withval"], + [ndb_port="default"]) AC_ARG_WITH([ndb-port-base], [ - --with-ndb-port-base Base port for NDB Cluster], + --with-ndb-port-base Base port for NDB Cluster transporters], [ndb_port_base="$withval"], [ndb_port_base="default"]) diff --git a/configure.in b/configure.in index cd2daf10690..764261c6933 100644 --- a/configure.in +++ b/configure.in @@ -3052,9 +3052,15 @@ AC_SUBST([NDB_DEFS]) AC_SUBST([ndb_cxxflags_fix]) +if test X"$ndb_port" = Xdefault +then + ndb_port="1186" +fi +AC_SUBST([ndb_port]) + if test X"$ndb_port_base" = Xdefault then - ndb_port_base="2200" + ndb_port_base="2202" fi AC_SUBST([ndb_port_base]) diff --git a/mysql-test/mysql-test-run.sh b/mysql-test/mysql-test-run.sh index 65be9bf9c8e..fcb9e5eee2d 100644 --- a/mysql-test/mysql-test-run.sh +++ b/mysql-test/mysql-test-run.sh @@ -457,6 +457,9 @@ SMALL_SERVER="--key_buffer_size=1M --sort_buffer=256K --max_heap_table_size=1M" export MASTER_MYPORT MASTER_MYPORT1 SLAVE_MYPORT MYSQL_TCP_PORT MASTER_MYSOCK MASTER_MYSOCK1 +NDBCLUSTER_BASE_PORT=`expr $NDBCLUSTER_PORT + 2` +NDBCLUSTER_OPTS="--port=$NDBCLUSTER_PORT --port-base=$NDBCLUSTER_BASE_PORT --data-dir=$MYSQL_TEST_DIR/var" + if [ x$SOURCE_DIST = x1 ] ; then MY_BASEDIR=$MYSQL_TEST_DIR else @@ -939,11 +942,11 @@ start_ndbcluster() echo "Starting ndbcluster" if [ "$DO_BENCH" = 1 ] then - NDBCLUSTER_OPTS="" + NDBCLUSTER_EXTRA_OPTS="" else - NDBCLUSTER_OPTS="--small" + NDBCLUSTER_EXTRA_OPTS="--small" fi - ./ndb/ndbcluster --port-base=$NDBCLUSTER_PORT $NDBCLUSTER_OPTS --diskless --initial --data-dir=$MYSQL_TEST_DIR/var || exit 1 + ./ndb/ndbcluster $NDBCLUSTER_OPTS $NDBCLUSTER_EXTRA_OPTS --diskless --initial || exit 1 NDB_CONNECTSTRING="host=localhost:$NDBCLUSTER_PORT" else NDB_CONNECTSTRING="$USE_RUNNING_NDBCLUSTER" @@ -961,7 +964,7 @@ stop_ndbcluster() if [ -z "$USE_RUNNING_NDBCLUSTER" ] then # Kill any running ndbcluster stuff - ./ndb/ndbcluster --data-dir=$MYSQL_TEST_DIR/var --port-base=$NDBCLUSTER_PORT --stop + ./ndb/ndbcluster $NDBCLUSTER_OPTS --stop fi fi } diff --git a/mysql-test/ndb/Makefile.am b/mysql-test/ndb/Makefile.am index 3ed222344a6..502ccee099e 100644 --- a/mysql-test/ndb/Makefile.am +++ b/mysql-test/ndb/Makefile.am @@ -13,6 +13,8 @@ SUFFIXES = .sh .sh: @RM@ -f $@ $@-t @SED@ \ + -e 's!@''ndb_port''@!$(ndb_port)!g' \ + -e 's!@''ndb_port_base''@!$(ndb_port_base)!g' \ -e 's!@''ndbbindir''@!$(ndbbindir)!g' \ -e 's!@''ndbtoolsdir''@!$(ndbtoolsdir)!g' \ $< > $@-t diff --git a/mysql-test/ndb/ndbcluster.sh b/mysql-test/ndb/ndbcluster.sh index f5757a11056..3486a879eec 100644 --- a/mysql-test/ndb/ndbcluster.sh +++ b/mysql-test/ndb/ndbcluster.sh @@ -5,7 +5,8 @@ # This scripts starts the table handler ndbcluster # configurable parameters, make sure to change in mysqlcluterd as well -port_base="2200" +port=@ndb_port@ +port_base=@ndb_port_base@ fsdir=`pwd` # end configurable parameters @@ -84,6 +85,9 @@ while test $# -gt 0; do --data-dir=*) fsdir=`echo "$1" | sed -e "s;--data-dir=;;"` ;; + --port=*) + port=`echo "$1" | sed -e "s;--port=;;"` + ;; --port-base=*) port_base=`echo "$1" | sed -e "s;--port-base=;;"` ;; @@ -94,7 +98,7 @@ while test $# -gt 0; do shift done -fs_ndb="$fsdir/ndbcluster-$port_base" +fs_ndb="$fsdir/ndbcluster-$port" NDB_HOME= if [ ! -x "$fsdir" ]; then @@ -120,7 +124,7 @@ exec_ndb="$exec_ndb --no-defaults" exec_waiter="$exec_waiter --no-defaults" ndb_host="localhost" -ndb_mgmd_port=$port_base +ndb_mgmd_port=$port NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port" export NDB_CONNECTSTRING @@ -158,10 +162,6 @@ if [ -d "$fs_ndb" ]; then :; else exit 1 fi -# set som help variables - -port_transporter=`expr $ndb_mgmd_port + 2` - # Start management server as deamon # Edit file system path and ports in config file @@ -176,7 +176,7 @@ sed \ -e s,"CHOOSE_HOSTNAME_".*,"$ndb_host",g \ -e s,"CHOOSE_FILESYSTEM","$fs_ndb",g \ -e s,"CHOOSE_PORT_MGM","$ndb_mgmd_port",g \ - -e s,"CHOOSE_PORT_TRANSPORTER","$port_transporter",g \ + -e s,"CHOOSE_PORT_TRANSPORTER","$port_base",g \ < ndb/ndb_config_2_node.ini \ > "$fs_ndb/config.ini" fi diff --git a/ndb/include/mgmapi/mgmapi.h b/ndb/include/mgmapi/mgmapi.h index 6dcf58b44e2..f1ef357421b 100644 --- a/ndb/include/mgmapi/mgmapi.h +++ b/ndb/include/mgmapi/mgmapi.h @@ -733,6 +733,7 @@ extern "C" { int param, unsigned long long * value); int ndb_mgm_get_string_parameter(const ndb_mgm_configuration_iterator*, int param, const char ** value); + int ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **); #ifdef __cplusplus } #endif diff --git a/ndb/include/util/Bitmask.hpp b/ndb/include/util/Bitmask.hpp index bb217adab5f..19aa604e4a1 100644 --- a/ndb/include/util/Bitmask.hpp +++ b/ndb/include/util/Bitmask.hpp @@ -105,6 +105,11 @@ public: static void bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]); /** + * bitXORC - Bitwise (x ^ ~y) into first operand. + */ + static void bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]); + + /** * contains - Check if all bits set in data2 are set in data */ static bool contains(unsigned size, Uint32 data[], const Uint32 data2[]); @@ -261,6 +266,14 @@ BitmaskImpl::bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]) } } +inline void +BitmaskImpl::bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]) +{ + for (unsigned i = 0; i < size; i++) { + data[i] ^= ~data2[i]; + } +} + inline bool BitmaskImpl::contains(unsigned size, Uint32 data[], const Uint32 data2[]) { @@ -452,6 +465,12 @@ public: BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2); /** + * bitXORC - Bitwise (x ^ ~y) into first operand. + */ + static void bitXORC(Uint32 data[], const Uint32 data2[]); + BitmaskPOD<size>& bitXORC(const BitmaskPOD<size>& mask2); + + /** * contains - Check if all bits set in data2 (that) are also set in data (this) */ static bool contains(Uint32 data[], const Uint32 data2[]); @@ -713,6 +732,21 @@ BitmaskPOD<size>::bitXOR(const BitmaskPOD<size>& mask2) } template <unsigned size> +inline void +BitmaskPOD<size>::bitXORC(Uint32 data[], const Uint32 data2[]) +{ + BitmaskImpl::bitXORC(size,data, data2); +} + +template <unsigned size> +inline BitmaskPOD<size>& +BitmaskPOD<size>::bitXORC(const BitmaskPOD<size>& mask2) +{ + BitmaskPOD<size>::bitXORC(rep.data, mask2.rep.data); + return *this; +} + +template <unsigned size> char * BitmaskPOD<size>::getText(const Uint32 data[], char* buf) { diff --git a/ndb/include/util/SocketServer.hpp b/ndb/include/util/SocketServer.hpp index 3860b9ca84b..2fad991e5f8 100644 --- a/ndb/include/util/SocketServer.hpp +++ b/ndb/include/util/SocketServer.hpp @@ -37,7 +37,7 @@ public: public: virtual ~Session() {} virtual void runSession(){} - virtual void stopSession(){} + virtual void stopSession(){ m_stop = true; } protected: friend class SocketServer; friend void* sessionThread_C(void*); @@ -98,6 +98,8 @@ public: */ void stopSessions(bool wait = false); + void foreachSession(void (*f)(SocketServer::Session*, void*), void *data); + private: struct SessionInstance { Service * m_service; diff --git a/ndb/src/common/mgmcommon/LocalConfig.cpp b/ndb/src/common/mgmcommon/LocalConfig.cpp index 3cd4341c6b7..679de716be0 100644 --- a/ndb/src/common/mgmcommon/LocalConfig.cpp +++ b/ndb/src/common/mgmcommon/LocalConfig.cpp @@ -90,7 +90,7 @@ LocalConfig::init(const char *connectString, //7. Check { char buf[256]; - BaseString::snprintf(buf, sizeof(buf), "host=localhost:%s", NDB_BASE_PORT); + BaseString::snprintf(buf, sizeof(buf), "host=localhost:%s", NDB_PORT); if(readConnectString(buf, "default connect string")) return true; } @@ -124,12 +124,12 @@ void LocalConfig::printUsage() const { ndbout << "1. Put a Ndb.cfg file in the directory where you start"<<endl << " the node. "<< endl << " Ex: Ndb.cfg" << endl - << " | host=localhost:"<<NDB_BASE_PORT<<endl; + << " | host=localhost:"<<NDB_PORT<<endl; ndbout << "2. Use the environment variable NDB_CONNECTSTRING to "<<endl << " provide this information." <<endl << " Ex: " << endl - << " >export NDB_CONNECTSTRING=\"host=localhost:"<<NDB_BASE_PORT<<"\"" + << " >export NDB_CONNECTSTRING=\"host=localhost:"<<NDB_PORT<<"\"" <<endl<<endl; } diff --git a/ndb/src/common/mgmcommon/Makefile.am b/ndb/src/common/mgmcommon/Makefile.am index ed6a526eb47..b787da51ab9 100644 --- a/ndb/src/common/mgmcommon/Makefile.am +++ b/ndb/src/common/mgmcommon/Makefile.am @@ -7,7 +7,7 @@ libmgmsrvcommon_la_SOURCES = \ INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi -I$(top_srcdir)/ndb/src/mgmsrv -DEFS_LOC = -DNDB_BASE_PORT="\"@ndb_port_base@\"" +DEFS_LOC = -DNDB_PORT="\"@ndb_port@\"" include $(top_srcdir)/ndb/config/common.mk.am include $(top_srcdir)/ndb/config/type_ndbapi.mk.am diff --git a/ndb/src/common/mgmcommon/NdbConfig.c b/ndb/src/common/mgmcommon/NdbConfig.c index e92f8fa8392..8adc4c20dff 100644 --- a/ndb/src/common/mgmcommon/NdbConfig.c +++ b/ndb/src/common/mgmcommon/NdbConfig.c @@ -74,7 +74,7 @@ NdbConfig_NdbCfgName(int with_ndb_home){ static char *get_prefix_buf(int len, int node_id) { - char tmp_buf[sizeof("ndb_pid#########")+1]; + char tmp_buf[sizeof("ndb_pid#############")+1]; char *buf; if (node_id > 0) snprintf(tmp_buf, sizeof(tmp_buf), "ndb_%u", node_id); diff --git a/ndb/src/common/util/SocketServer.cpp b/ndb/src/common/util/SocketServer.cpp index c3cffa1399b..8bee256684d 100644 --- a/ndb/src/common/util/SocketServer.cpp +++ b/ndb/src/common/util/SocketServer.cpp @@ -259,6 +259,15 @@ transfer(NDB_SOCKET_TYPE sock){ } void +SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) +{ + for(int i = m_sessions.size() - 1; i >= 0; i--){ + (*func)(m_sessions[i].m_session, data); + } + checkSessions(); +} + +void SocketServer::checkSessions(){ for(int i = m_sessions.size() - 1; i >= 0; i--){ if(m_sessions[i].m_session->m_stopped){ @@ -278,8 +287,10 @@ void SocketServer::stopSessions(bool wait){ int i; for(i = m_sessions.size() - 1; i>=0; i--) - m_sessions[i].m_session->m_stop = true; - + { + m_sessions[i].m_session->stopSession(); + m_sessions[i].m_session->m_stop = true; // to make sure + } for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 4b62df968b3..66f0dbb1842 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -1834,4 +1834,46 @@ ndb_mgm_set_string_parameter(NdbMgmHandle handle, return res; } +extern "C" +int +ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **purged){ + CHECK_HANDLE(handle, 0); + CHECK_CONNECTED(handle, 0); + + Properties args; + + const ParserRow<ParserDummy> reply[]= { + MGM_CMD("purge stale sessions reply", NULL, ""), + MGM_ARG("purged", String, Optional, ""), + MGM_ARG("result", String, Mandatory, "Error message"), + MGM_END() + }; + + const Properties *prop; + prop= ndb_mgm_call(handle, reply, "purge stale sessions", &args); + + if(prop == NULL) { + SET_ERROR(handle, EIO, "Unable to purge stale sessions"); + return -1; + } + + int res= -1; + do { + const char * buf; + if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ + ndbout_c("ERROR Message: %s\n", buf); + break; + } + if (purged) { + if (prop->get("purged", &buf)) + *purged= strdup(buf); + else + *purged= 0; + } + res= 0; + } while(0); + delete prop; + return res; +} + template class Vector<const ParserRow<ParserDummy>*>; diff --git a/ndb/src/mgmclient/CommandInterpreter.cpp b/ndb/src/mgmclient/CommandInterpreter.cpp index e802ffff5ce..f1a953769b8 100644 --- a/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/ndb/src/mgmclient/CommandInterpreter.cpp @@ -17,17 +17,6 @@ #include <ndb_global.h> #include <my_sys.h> -// copied from mysql.cc to get readline -extern "C" { -#if defined( __WIN__) || defined(OS2) -#include <conio.h> -#elif !defined(__NETWARE__) -#include <readline/readline.h> -extern "C" int add_history(const char *command); /* From readline directory */ -#define HAVE_READLINE -#endif -} - //#define HAVE_GLOBAL_REPLICATION #include <Vector.hpp> @@ -65,7 +54,6 @@ public: * * @return true until quit/bye/exit has been typed */ - int readAndExecute(int _try_reconnect=-1); int execute(const char *_line, int _try_reconnect=-1); private: @@ -106,6 +94,7 @@ private: */ void executeHelp(char* parameters); void executeShow(char* parameters); + void executePurge(char* parameters); void executeShutdown(char* parameters); void executeRun(char* parameters); void executeInfo(char* parameters); @@ -179,6 +168,7 @@ private: */ #include "ndb_mgmclient.hpp" +#include "ndb_mgmclient.h" Ndb_mgmclient::Ndb_mgmclient(const char *host) { @@ -188,10 +178,6 @@ Ndb_mgmclient::~Ndb_mgmclient() { delete m_cmd; } -int Ndb_mgmclient::read_and_execute(int _try_reconnect) -{ - return m_cmd->readAndExecute(_try_reconnect); -} int Ndb_mgmclient::execute(const char *_line, int _try_reconnect) { return m_cmd->execute(_line,_try_reconnect); @@ -202,7 +188,20 @@ Ndb_mgmclient::disconnect() return m_cmd->disconnect(); } - +extern "C" { + Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string) + { + return (Ndb_mgmclient_handle) new Ndb_mgmclient(connect_string); + } + int ndb_mgmclient_execute(Ndb_mgmclient_handle h, int argc, const char** argv) + { + return ((Ndb_mgmclient*)h)->execute(argc, argv, 1); + } + int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle h) + { + delete (Ndb_mgmclient*)h; + } +} /* * The CommandInterpreter */ @@ -226,6 +225,17 @@ Ndb_mgmclient::disconnect() #include <util/InputStream.hpp> #include <util/OutputStream.hpp> +int Ndb_mgmclient::execute(int argc, const char** argv, int _try_reconnect) +{ + if (argc <= 0) + return 0; + BaseString _line(argv[0]); + for (int i= 1; i < argc; i++) + { + _line.appfmt(" %s", argv[i]); + } + return m_cmd->execute(_line.c_str(),_try_reconnect); +} /***************************************************************************** * HELP @@ -264,6 +274,7 @@ static const char* helpText = #ifdef HAVE_GLOBAL_REPLICATION "REP CONNECT <host:port> Connect to REP server on host:port\n" #endif +"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n" "QUIT Quit management client\n" ; @@ -455,39 +466,6 @@ CommandInterpreter::disconnect() //***************************************************************************** int -CommandInterpreter::readAndExecute(int _try_reconnect) -{ - static char *line_read = (char *)NULL; - - /* If the buffer has already been allocated, return the memory - to the free pool. */ - if (line_read) - { - free (line_read); - line_read = (char *)NULL; - } -#ifdef HAVE_READLINE - /* Get a line from the user. */ - line_read = readline ("ndb_mgm> "); - /* If the line has any text in it, save it on the history. */ - if (line_read && *line_read) - add_history (line_read); -#else - static char linebuffer[254]; - fputs("ndb_mgm> ", stdout); - linebuffer[sizeof(linebuffer)-1]=0; - line_read = fgets(linebuffer, sizeof(linebuffer)-1, stdin); - if (line_read == linebuffer) { - char *q=linebuffer; - while (*q > 31) q++; - *q=0; - line_read= strdup(linebuffer); - } -#endif - return execute(line_read,_try_reconnect); -} - -int CommandInterpreter::execute(const char *_line, int _try_reconnect) { if (_try_reconnect >= 0) @@ -541,6 +519,10 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect) executeAbortBackup(allAfterFirstToken); return true; } + else if (strcmp(firstToken, "PURGE") == 0) { + executePurge(allAfterFirstToken); + return true; + } #ifdef HAVE_GLOBAL_REPLICATION else if(strcmp(firstToken, "REPLICATION") == 0 || strcmp(firstToken, "REP") == 0) { @@ -983,6 +965,46 @@ print_nodes(ndb_mgm_cluster_state *state, ndb_mgm_configuration_iterator *it, } void +CommandInterpreter::executePurge(char* parameters) +{ + int command_ok= 0; + do { + if (emptyString(parameters)) + break; + char* firstToken = strtok(parameters, " "); + char* nextToken = strtok(NULL, " \0"); + if (strcmp(firstToken,"STALE") == 0 && + nextToken && + strcmp(nextToken, "SESSIONS") == 0) { + command_ok= 1; + break; + } + } while(0); + + if (!command_ok) { + ndbout_c("Unexpected command, expected: PURGE STALE SESSIONS"); + return; + } + + int i; + char *str; + connect(); + + if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) { + ndbout_c("Command failed"); + return; + } + if (str) { + ndbout_c("Purged sessions with node id's: %s", str); + free(str); + } + else + { + ndbout_c("No sessions purged"); + } +} + +void CommandInterpreter::executeShow(char* parameters) { int i; diff --git a/ndb/src/mgmclient/main.cpp b/ndb/src/mgmclient/main.cpp index 3415ede1985..8f5d9e6656c 100644 --- a/ndb/src/mgmclient/main.cpp +++ b/ndb/src/mgmclient/main.cpp @@ -17,6 +17,17 @@ #include <ndb_global.h> #include <ndb_opts.h> +// copied from mysql.cc to get readline +extern "C" { +#if defined( __WIN__) || defined(OS2) +#include <conio.h> +#elif !defined(__NETWARE__) +#include <readline/readline.h> +extern "C" int add_history(const char *command); /* From readline directory */ +#define HAVE_READLINE +#endif +} + #include <NdbMain.h> #include <NdbHost.h> #include <mgmapi.h> @@ -90,6 +101,39 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), return 0; } +static int +read_and_execute(int _try_reconnect) +{ + static char *line_read = (char *)NULL; + + /* If the buffer has already been allocated, return the memory + to the free pool. */ + if (line_read) + { + free (line_read); + line_read = (char *)NULL; + } +#ifdef HAVE_READLINE + /* Get a line from the user. */ + line_read = readline ("ndb_mgm> "); + /* If the line has any text in it, save it on the history. */ + if (line_read && *line_read) + add_history (line_read); +#else + static char linebuffer[254]; + fputs("ndb_mgm> ", stdout); + linebuffer[sizeof(linebuffer)-1]=0; + line_read = fgets(linebuffer, sizeof(linebuffer)-1, stdin); + if (line_read == linebuffer) { + char *q=linebuffer; + while (*q > 31) q++; + *q=0; + line_read= strdup(linebuffer); + } +#endif + return com->execute(line_read,_try_reconnect); +} + int main(int argc, char** argv){ NDB_INIT(argv[0]); const char *_host = 0; @@ -128,7 +172,7 @@ int main(int argc, char** argv){ signal(SIGPIPE, handler); com = new Ndb_mgmclient(buf); - while(com->read_and_execute(_try_reconnect)); + while(read_and_execute(_try_reconnect)); delete com; return 0; diff --git a/ndb/src/mgmclient/ndb_mgmclient.h b/ndb/src/mgmclient/ndb_mgmclient.h new file mode 100644 index 00000000000..265e6bc67ec --- /dev/null +++ b/ndb/src/mgmclient/ndb_mgmclient.h @@ -0,0 +1,33 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef Ndb_mgmclient_h +#define Ndb_mgmclient_h + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* Ndb_mgmclient_handle; +Ndb_mgmclient_handle ndb_mgmclient_handle_create(const char *connect_string); +int ndb_mgmclient_execute(Ndb_mgmclient_handle, int argc, const char** argv); +int ndb_mgmclient_handle_destroy(Ndb_mgmclient_handle); + +#ifdef __cplusplus +} +#endif + +#endif /* Ndb_mgmclient_h */ diff --git a/ndb/src/mgmclient/ndb_mgmclient.hpp b/ndb/src/mgmclient/ndb_mgmclient.hpp index 2f021a0f2b6..933d1bab5ce 100644 --- a/ndb/src/mgmclient/ndb_mgmclient.hpp +++ b/ndb/src/mgmclient/ndb_mgmclient.hpp @@ -14,8 +14,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifndef Ndb_mgmclient_h -#define Ndb_mgmclient_h +#ifndef Ndb_mgmclient_hpp +#define Ndb_mgmclient_hpp class CommandInterpreter; class Ndb_mgmclient @@ -23,11 +23,11 @@ class Ndb_mgmclient public: Ndb_mgmclient(const char*); ~Ndb_mgmclient(); - int read_and_execute(int _try_reconnect=-1); int execute(const char *_line, int _try_reconnect=-1); + int execute(int argc, const char** argv, int _try_reconnect=-1); int disconnect(); private: CommandInterpreter *m_cmd; }; -#endif // Ndb_mgmclient_h +#endif // Ndb_mgmclient_hpp diff --git a/ndb/src/mgmsrv/ConfigInfo.cpp b/ndb/src/mgmsrv/ConfigInfo.cpp index ad346b30ead..4af1556a0c0 100644 --- a/ndb/src/mgmsrv/ConfigInfo.cpp +++ b/ndb/src/mgmsrv/ConfigInfo.cpp @@ -1478,7 +1478,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::INT, - NDB_BASE_PORT, + NDB_PORT, "0", STR_VALUE(MAX_INT_RNIL) }, @@ -1490,7 +1490,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ConfigInfo::USED, false, ConfigInfo::INT, - "2199", + UNDEFINED, "0", STR_VALUE(MAX_INT_RNIL) }, @@ -3010,7 +3010,7 @@ fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){ if(!(ctx.m_userDefaults && ctx.m_userDefaults->get("PortNumber", &base)) && !ctx.m_systemDefaults->get("PortNumber", &base)) { - base= strtoll(NDB_BASE_PORT,0,0)+2; + base= strtoll(NDB_BASE_PORT,0,0); // ctx.reportError("Cannot retrieve base port number"); // return false; } @@ -3442,7 +3442,7 @@ static bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>§ions, #if 0 Properties * props= ctx.m_config; Properties computers(true); - Uint32 port_base = NDB_BASE_PORT+2; + Uint32 port_base = NDB_BASE_PORT; Uint32 nNodes; ctx.m_userProperties.get("NoOfNodes", &nNodes); diff --git a/ndb/src/mgmsrv/Makefile.am b/ndb/src/mgmsrv/Makefile.am index 4cb164d48fe..ee5220ef9f8 100644 --- a/ndb/src/mgmsrv/Makefile.am +++ b/ndb/src/mgmsrv/Makefile.am @@ -33,6 +33,7 @@ DEFS_LOC = -DDEFAULT_MYSQL_HOME="\"$(MYSQLBASEdir)\"" \ -DDATADIR="\"$(MYSQLDATAdir)\"" \ -DSHAREDIR="\"$(MYSQLSHAREdir)\"" \ -DMYSQLCLUSTERDIR="\"$(MYSQLCLUSTERdir)\"" \ + -DNDB_PORT="\"@ndb_port@\"" \ -DNDB_BASE_PORT="\"@ndb_port_base@\"" include $(top_srcdir)/ndb/config/common.mk.am diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index 2e30d73290b..a49b29af275 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -400,11 +400,13 @@ MgmtSrvr::getPort() const { /* Constructor */ MgmtSrvr::MgmtSrvr(NodeId nodeId, + SocketServer *socket_server, const BaseString &configFilename, LocalConfig &local_config, Config * config): _blockNumber(1), // Hard coded block number since it makes it easy to send // signals to other management servers. + m_socket_server(socket_server), _ownReference(0), m_local_config(local_config), theSignalIdleList(NULL), @@ -2094,6 +2096,25 @@ MgmtSrvr::getNodeType(NodeId nodeId) const return nodeTypes[nodeId]; } +void +MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const +{ + if (theFacade && theFacade->theClusterMgr) + { + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) + { + const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); + if (node.connected) + { + connected_nodes.bitOR(node.m_state.m_connected_nodes); + } + } + } + } +} + bool MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, @@ -2106,7 +2127,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, *nodeId, type, client_addr)); if (g_no_nodeid_checks) { if (*nodeId == 0) { - error_string.appfmt("no-nodeid-ckecks set in manegment server.\n" + error_string.appfmt("no-nodeid-checks set in management server.\n" "node id must be set explicitly in connectstring"); DBUG_RETURN(false); } @@ -2115,16 +2136,11 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, Guard g(m_node_id_mutex); int no_mgm= 0; NodeBitmask connected_nodes(m_reserved_nodes); - for(Uint32 i = 0; i < MAX_NODES; i++) + get_connected_nodes(connected_nodes); { - if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB && - theFacade && theFacade->theClusterMgr) { - const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); - if (node.connected) { - connected_nodes.bitOR(node.m_state.m_connected_nodes); - } - } else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) - no_mgm++; + for(Uint32 i = 0; i < MAX_NODES; i++) + if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM) + no_mgm++; } bool found_matching_id= false; bool found_matching_type= false; @@ -2227,6 +2243,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, m_connect_address[id_found].s_addr= 0; } m_reserved_nodes.set(id_found); + char tmp_str[128]; + m_reserved_nodes.getText(tmp_str); + g_EventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.", + id_found, get_connect_address(id_found), tmp_str); DBUG_RETURN(true); } @@ -2283,6 +2303,36 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, error_string.appfmt("No node defined with id=%d in config file.", *nodeId); } + + g_EventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. " + "Returned error string \"%s\"", + *nodeId, + client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>", + error_string.c_str()); + + NodeBitmask connected_nodes2; + get_connected_nodes(connected_nodes2); + { + BaseString tmp_connected, tmp_not_connected; + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (connected_nodes2.get(i)) + { + if (!m_reserved_nodes.get(i)) + tmp_connected.appfmt(" %d", i); + } + else if (m_reserved_nodes.get(i)) + { + tmp_not_connected.appfmt(" %d", i); + } + } + if (tmp_connected.length() > 0) + g_EventLogger.info("Mgmt server state: node id's %s connected but not reserved", + tmp_connected.c_str()); + if (tmp_not_connected.length() > 0) + g_EventLogger.info("Mgmt server state: node id's %s not connected but reserved", + tmp_not_connected.c_str()); + } DBUG_RETURN(false); } @@ -2531,10 +2581,15 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() { Guard g(m_mgmsrv.m_node_id_mutex); if (!m_reserved_nodes.isclear()) { + m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes global_flag_send_heartbeat_now= 1; + + char tmp_str[128]; + m_mgmsrv.m_reserved_nodes.getText(tmp_str); + g_EventLogger.info("Mgmt server state: nodeid %d freed, m_reserved_nodes %s.", + get_nodeid(), tmp_str); } - m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); } void @@ -2543,6 +2598,17 @@ MgmtSrvr::Allocated_resources::reserve_node(NodeId id) m_reserved_nodes.set(id); } +NodeId +MgmtSrvr::Allocated_resources::get_nodeid() const +{ + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if (m_reserved_nodes.get(i)) + return i; + } + return 0; +} + int MgmtSrvr::setDbParameter(int node, int param, const char * value, BaseString& msg){ diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index c796e1e9219..b3257491123 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -96,7 +96,10 @@ public: // methods to reserve/allocate resources which // will be freed when running destructor void reserve_node(NodeId id); - bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId);} + bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } + bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } + bool isclear() { return m_reserved_nodes.isclear(); } + NodeId get_nodeid() const; private: MgmtSrvr &m_mgmsrv; NodeBitmask m_reserved_nodes; @@ -173,6 +176,7 @@ public: /* Constructor */ MgmtSrvr(NodeId nodeId, /* Local nodeid */ + SocketServer *socket_server, const BaseString &config_filename, /* Where to save config */ LocalConfig &local_config, /* Ndb.cfg filename */ Config * config); @@ -499,6 +503,9 @@ public: int setDbParameter(int node, int parameter, const char * value, BaseString&); const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); } + void get_connected_nodes(NodeBitmask &connected_nodes) const; + SocketServer *get_socket_server() { return m_socket_server; } + //************************************************************************** private: //************************************************************************** @@ -525,6 +532,8 @@ private: int _blockNumber; NodeId _ownNodeId; + SocketServer *m_socket_server; + BlockReference _ownReference; NdbMutex *m_configMutex; const Config * _config; diff --git a/ndb/src/mgmsrv/Services.cpp b/ndb/src/mgmsrv/Services.cpp index 2672d8c9d4b..0394c4e80bb 100644 --- a/ndb/src/mgmsrv/Services.cpp +++ b/ndb/src/mgmsrv/Services.cpp @@ -242,6 +242,8 @@ ParserRow<MgmApiSession> commands[] = { MGM_ARG("node", Int, Optional, "Node"), MGM_ARG("filter", String, Mandatory, "Event category"), + MGM_CMD("purge stale sessions", &MgmApiSession::purge_stale_sessions, ""), + MGM_END() }; @@ -1412,6 +1414,46 @@ done: m_output->println(""); } +struct PurgeStruct +{ + NodeBitmask free_nodes;/* free nodes as reported + * by ndbd in apiRegReqConf + */ + BaseString *str; +}; + +void +MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) +{ + MgmApiSession *s= (MgmApiSession *)_s; + struct PurgeStruct &ps= *(struct PurgeStruct *)data; + if (s->m_allocated_resources->is_reserved(ps.free_nodes)) + { + ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); + s->stopSession(); + } +} + +void +MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, + const class Properties &args) +{ + struct PurgeStruct ps; + BaseString str; + ps.str = &str; + + m_mgmsrv.get_connected_nodes(ps.free_nodes); + ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes + + m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); + + m_output->println("purge stale sessions reply"); + if (str.length() > 0) + m_output->println("purged:%s",str.c_str()); + m_output->println("result: Ok"); + m_output->println(""); +} + template class MutexVector<int>; template class Vector<ParserRow<MgmApiSession> const*>; template class Vector<unsigned short>; diff --git a/ndb/src/mgmsrv/Services.hpp b/ndb/src/mgmsrv/Services.hpp index e47820826b6..bfc915f18f1 100644 --- a/ndb/src/mgmsrv/Services.hpp +++ b/ndb/src/mgmsrv/Services.hpp @@ -28,7 +28,9 @@ /** Undefine this to remove backwards compatibility for "GET CONFIG". */ #define MGM_GET_CONFIG_BACKWARDS_COMPAT -class MgmApiSession : public SocketServer::Session { +class MgmApiSession : public SocketServer::Session +{ + static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); private: typedef Parser<MgmApiSession> Parser_t; @@ -84,6 +86,8 @@ public: void setParameter(Parser_t::Context &ctx, const class Properties &args); void listen_event(Parser_t::Context &ctx, const class Properties &args); + + void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args); void repCommand(Parser_t::Context &ctx, const class Properties &args); }; diff --git a/ndb/src/mgmsrv/main.cpp b/ndb/src/mgmsrv/main.cpp index 15767e4766d..7a57fdeb77a 100644 --- a/ndb/src/mgmsrv/main.cpp +++ b/ndb/src/mgmsrv/main.cpp @@ -83,7 +83,6 @@ struct MgmGlobals { int g_no_nodeid_checks= 0; static MgmGlobals glob; - /****************************************************************************** * Function prototypes ******************************************************************************/ @@ -226,7 +225,7 @@ int main(int argc, char** argv) if (!readGlobalConfig()) goto error_end; - glob.mgmObject = new MgmtSrvr(glob.localNodeId, + glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer, BaseString(glob.config_filename), local_config, glob.cluster_config); |