summaryrefslogtreecommitdiff
path: root/storage/ndb/src/cw/cpcd
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/cw/cpcd')
-rw-r--r--storage/ndb/src/cw/cpcd/APIService.cpp386
-rw-r--r--storage/ndb/src/cw/cpcd/APIService.hpp64
-rw-r--r--storage/ndb/src/cw/cpcd/CPCD.cpp435
-rw-r--r--storage/ndb/src/cw/cpcd/CPCD.hpp382
-rw-r--r--storage/ndb/src/cw/cpcd/Makefile.am20
-rw-r--r--storage/ndb/src/cw/cpcd/Monitor.cpp79
-rw-r--r--storage/ndb/src/cw/cpcd/Process.cpp479
-rw-r--r--storage/ndb/src/cw/cpcd/common.cpp98
-rw-r--r--storage/ndb/src/cw/cpcd/common.hpp36
-rw-r--r--storage/ndb/src/cw/cpcd/main.cpp187
10 files changed, 2166 insertions, 0 deletions
diff --git a/storage/ndb/src/cw/cpcd/APIService.cpp b/storage/ndb/src/cw/cpcd/APIService.cpp
new file mode 100644
index 00000000000..63d0aaafe86
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/APIService.cpp
@@ -0,0 +1,386 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <Parser.hpp>
+#include <NdbOut.hpp>
+#include <Properties.hpp>
+#include <socket_io.h>
+
+#include "APIService.hpp"
+#include "CPCD.hpp"
+#include <NdbMutex.h>
+#include <OutputStream.hpp>
+
+/**
+ const char * name;
+ const char * realName;
+ const Type type;
+ const ArgType argType;
+ const ArgRequired argRequired;
+ const ArgMinMax argMinMax;
+ const int minVal;
+ const int maxVal;
+ void (T::* function)(const class Properties & args);
+ const char * description;
+*/
+
+#define CPCD_CMD(name, fun, desc) \
+ { name, \
+ 0, \
+ ParserRow<CPCDAPISession>::Cmd, \
+ ParserRow<CPCDAPISession>::String, \
+ ParserRow<CPCDAPISession>::Optional, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ 0, 0, \
+ fun, \
+ desc, 0 }
+
+#define CPCD_ARG(name, type, opt, desc) \
+ { name, \
+ 0, \
+ ParserRow<CPCDAPISession>::Arg, \
+ ParserRow<CPCDAPISession>::type, \
+ ParserRow<CPCDAPISession>::opt, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ 0, 0, \
+ 0, \
+ desc, 0 }
+
+#define CPCD_ARG2(name, type, opt, min, max, desc) \
+ { name, \
+ 0, \
+ ParserRow<CPCDAPISession>::Arg, \
+ ParserRow<CPCDAPISession>::type, \
+ ParserRow<CPCDAPISession>::opt, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ min, max, \
+ 0, \
+ desc, 0 }
+
+#define CPCD_END() \
+ { 0, \
+ 0, \
+ ParserRow<CPCDAPISession>::Arg, \
+ ParserRow<CPCDAPISession>::Int, \
+ ParserRow<CPCDAPISession>::Optional, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ 0, 0, \
+ 0, \
+ 0, 0 }
+
+#define CPCD_CMD_ALIAS(name, realName, fun) \
+ { name, \
+ realName, \
+ ParserRow<CPCDAPISession>::CmdAlias, \
+ ParserRow<CPCDAPISession>::Int, \
+ ParserRow<CPCDAPISession>::Optional, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ 0, 0, \
+ 0, \
+ 0, 0 }
+
+#define CPCD_ARG_ALIAS(name, realName, fun) \
+ { name, \
+ realName, \
+ ParserRow<CPCDAPISession>::ArgAlias, \
+ ParserRow<CPCDAPISession>::Int, \
+ ParserRow<CPCDAPISession>::Optional, \
+ ParserRow<CPCDAPISession>::IgnoreMinMax, \
+ 0, 0, \
+ 0, \
+ 0, 0 }
+
+const
+ParserRow<CPCDAPISession> commands[] =
+{
+ CPCD_CMD("define process" , &CPCDAPISession::defineProcess, ""),
+ CPCD_ARG("id", Int, Optional, "Id of process."),
+ CPCD_ARG("name", String, Mandatory, "Name of process"),
+ CPCD_ARG("group", String, Mandatory, "Group of process"),
+ CPCD_ARG("env", String, Optional, "Environment variables for process"),
+ CPCD_ARG("path", String, Mandatory, "Path to binary"),
+ CPCD_ARG("args", String, Optional, "Arguments to process"),
+ CPCD_ARG("type", String, Mandatory, "Type of process"),
+ CPCD_ARG("cwd", String, Mandatory, "Working directory of process"),
+ CPCD_ARG("owner", String, Mandatory, "Owner of process"),
+ CPCD_ARG("runas", String, Optional, "Run as user"),
+ CPCD_ARG("stdout", String, Optional, "Redirection of stdout"),
+ CPCD_ARG("stderr", String, Optional, "Redirection of stderr"),
+ CPCD_ARG("stdin", String, Optional, "Redirection of stderr"),
+ CPCD_ARG("ulimit", String, Optional, "ulimit"),
+
+ CPCD_CMD("undefine process", &CPCDAPISession::undefineProcess, ""),
+ CPCD_CMD_ALIAS("undef", "undefine process", 0),
+ CPCD_ARG("id", Int, Mandatory, "Id of process"),
+ CPCD_ARG_ALIAS("i", "id", 0),
+
+ CPCD_CMD("start process", &CPCDAPISession::startProcess, ""),
+ CPCD_ARG("id", Int, Mandatory, "Id of process"),
+
+ CPCD_CMD("stop process", &CPCDAPISession::stopProcess, ""),
+ CPCD_ARG("id", Int, Mandatory, "Id of process"),
+
+ CPCD_CMD("list processes", &CPCDAPISession::listProcesses, ""),
+
+ CPCD_END()
+};
+CPCDAPISession::CPCDAPISession(NDB_SOCKET_TYPE sock,
+ CPCD & cpcd)
+ : SocketServer::Session(sock)
+ , m_cpcd(cpcd)
+{
+ m_input = new SocketInputStream(sock);
+ m_output = new SocketOutputStream(sock);
+ m_parser = new Parser<CPCDAPISession>(commands, *m_input, true, true, true);
+}
+
+CPCDAPISession::CPCDAPISession(FILE * f, CPCD & cpcd)
+ : SocketServer::Session(1)
+ , m_cpcd(cpcd)
+{
+ m_input = new FileInputStream(f);
+ m_parser = new Parser<CPCDAPISession>(commands, *m_input, true, true, true);
+}
+
+CPCDAPISession::~CPCDAPISession() {
+ delete m_input;
+ delete m_parser;
+}
+
+void
+CPCDAPISession::runSession(){
+ Parser_t::Context ctx;
+ while(!m_stop){
+ m_parser->run(ctx, * this);
+ if(ctx.m_currentToken == 0)
+ break;
+
+ switch(ctx.m_status){
+ case Parser_t::Ok:
+ for(size_t i = 0; i<ctx.m_aliasUsed.size(); i++)
+ ndbout_c("Used alias: %s -> %s",
+ ctx.m_aliasUsed[i]->name, ctx.m_aliasUsed[i]->realName);
+ break;
+ case Parser_t::NoLine:
+ case Parser_t::EmptyLine:
+ break;
+ default:
+ break;
+ }
+ }
+ NDB_CLOSE_SOCKET(m_socket);
+}
+
+void
+CPCDAPISession::stopSession(){
+ CPCD::RequestStatus rs;
+ for(size_t i = 0; i<m_temporaryProcesses.size(); i++){
+ Uint32 id = m_temporaryProcesses[i];
+ m_cpcd.undefineProcess(&rs, id);
+ }
+}
+
+void
+CPCDAPISession::loadFile(){
+ Parser_t::Context ctx;
+ while(!m_stop){
+ m_parser->run(ctx, * this);
+ if(ctx.m_currentToken == 0)
+ break;
+
+ switch(ctx.m_status){
+ case Parser_t::Ok:
+ for(size_t i = 0; i<ctx.m_aliasUsed.size(); i++)
+ ndbout_c("Used alias: %s -> %s",
+ ctx.m_aliasUsed[i]->name, ctx.m_aliasUsed[i]->realName);
+ break;
+ case Parser_t::NoLine:
+ case Parser_t::EmptyLine:
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+static const int g_TimeOut = 1000;
+
+void
+CPCDAPISession::defineProcess(Parser_t::Context & /* unused */,
+ const class Properties & args){
+
+ CPCD::Process * p = new CPCD::Process(args, &m_cpcd);
+
+ CPCD::RequestStatus rs;
+
+ bool ret = m_cpcd.defineProcess(&rs, p);
+ if(!m_cpcd.loadingProcessList) {
+ m_output->println("define process");
+ m_output->println("status: %d", rs.getStatus());
+ if(ret == true){
+ m_output->println("id: %d", p->m_id);
+ if(p->m_processType == TEMPORARY){
+ m_temporaryProcesses.push_back(p->m_id);
+ }
+ } else {
+ m_output->println("errormessage: %s", rs.getErrMsg());
+ }
+ m_output->println("");
+ }
+}
+
+void
+CPCDAPISession::undefineProcess(Parser_t::Context & /* unused */,
+ const class Properties & args){
+ Uint32 id;
+ CPCD::RequestStatus rs;
+
+ args.get("id", &id);
+ bool ret = m_cpcd.undefineProcess(&rs, id);
+
+ m_output->println("undefine process");
+ m_output->println("id: %d", id);
+ m_output->println("status: %d", rs.getStatus());
+ if(!ret)
+ m_output->println("errormessage: %s", rs.getErrMsg());
+
+ m_output->println("");
+}
+
+void
+CPCDAPISession::startProcess(Parser_t::Context & /* unused */,
+ const class Properties & args){
+ Uint32 id;
+ CPCD::RequestStatus rs;
+
+ args.get("id", &id);
+ const int ret = m_cpcd.startProcess(&rs, id);
+
+ if(!m_cpcd.loadingProcessList) {
+ m_output->println("start process");
+ m_output->println("id: %d", id);
+ m_output->println("status: %d", rs.getStatus());
+ if(!ret)
+ m_output->println("errormessage: %s", rs.getErrMsg());
+ m_output->println("");
+ }
+}
+
+void
+CPCDAPISession::stopProcess(Parser_t::Context & /* unused */,
+ const class Properties & args){
+ Uint32 id;
+ CPCD::RequestStatus rs;
+
+ args.get("id", &id);
+ int ret = m_cpcd.stopProcess(&rs, id);
+
+ m_output->println("stop process");
+ m_output->println("id: %d", id);
+ m_output->println("status: %d", rs.getStatus());
+ if(!ret)
+ m_output->println("errormessage: %s", rs.getErrMsg());
+
+ m_output->println("");
+}
+
+static const char *
+propToString(Properties *prop, const char *key) {
+ static char buf[32];
+ const char *retval = NULL;
+ PropertiesType pt;
+
+ prop->getTypeOf(key, &pt);
+ switch(pt) {
+ case PropertiesType_Uint32:
+ Uint32 val;
+ prop->get(key, &val);
+ BaseString::snprintf(buf, sizeof buf, "%d", val);
+ retval = buf;
+ break;
+ case PropertiesType_char:
+ const char *str;
+ prop->get(key, &str);
+ retval = str;
+ break;
+ default:
+ BaseString::snprintf(buf, sizeof buf, "(unknown)");
+ retval = buf;
+ }
+ return retval;
+}
+
+void
+CPCDAPISession::printProperty(Properties *prop, const char *key) {
+ m_output->println("%s: %s", key, propToString(prop, key));
+}
+
+void
+CPCDAPISession::listProcesses(Parser_t::Context & /* unused */,
+ const class Properties & /* unused */){
+ m_cpcd.m_processes.lock();
+ MutexVector<CPCD::Process *> *proclist = m_cpcd.getProcessList();
+
+ m_output->println("start processes");
+ m_output->println("");
+
+
+ for(size_t i = 0; i < proclist->size(); i++) {
+ CPCD::Process *p = (*proclist)[i];
+
+ m_output->println("process");
+
+ m_output->println("id: %d", p->m_id);
+ m_output->println("name: %s", p->m_name.c_str());
+ m_output->println("path: %s", p->m_path.c_str());
+ m_output->println("args: %s", p->m_args.c_str());
+ m_output->println("type: %s", p->m_type.c_str());
+ m_output->println("cwd: %s", p->m_cwd.c_str());
+ m_output->println("env: %s", p->m_env.c_str());
+ m_output->println("owner: %s", p->m_owner.c_str());
+ m_output->println("group: %s", p->m_group.c_str());
+ m_output->println("runas: %s", p->m_runas.c_str());
+ m_output->println("stdin: %s", p->m_stdin.c_str());
+ m_output->println("stdout: %s", p->m_stdout.c_str());
+ m_output->println("stderr: %s", p->m_stderr.c_str());
+ m_output->println("ulimit: %s", p->m_ulimit.c_str());
+ switch(p->m_status){
+ case STOPPED:
+ m_output->println("status: stopped");
+ break;
+ case STARTING:
+ m_output->println("status: starting");
+ break;
+ case RUNNING:
+ m_output->println("status: running");
+ break;
+ case STOPPING:
+ m_output->println("status: stopping");
+ break;
+ }
+
+ m_output->println("");
+
+ }
+
+ m_output->println("end processes");
+ m_output->println("");
+
+ m_cpcd.m_processes.unlock();
+}
+
+template class Vector<ParserRow<CPCDAPISession> const*>;
diff --git a/storage/ndb/src/cw/cpcd/APIService.hpp b/storage/ndb/src/cw/cpcd/APIService.hpp
new file mode 100644
index 00000000000..ef988785f89
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/APIService.hpp
@@ -0,0 +1,64 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef CPCD_API_HPP
+#define CPCD_API_HPP
+
+#include <Parser.hpp>
+#include <InputStream.hpp>
+#include <SocketServer.hpp>
+
+class CPCD;
+
+class CPCDAPISession : public SocketServer::Session {
+ typedef Parser<CPCDAPISession> Parser_t;
+
+ class CPCD & m_cpcd;
+ InputStream *m_input;
+ OutputStream *m_output;
+ Parser_t *m_parser;
+
+ Vector<int> m_temporaryProcesses;
+
+ void printProperty(Properties *prop, const char *key);
+public:
+ CPCDAPISession(NDB_SOCKET_TYPE, class CPCD &);
+ CPCDAPISession(FILE * f, CPCD & cpcd);
+ ~CPCDAPISession();
+
+ virtual void runSession();
+ virtual void stopSession();
+ void loadFile();
+
+ void defineProcess(Parser_t::Context & ctx, const class Properties & args);
+ void undefineProcess(Parser_t::Context & ctx, const class Properties & args);
+ void startProcess(Parser_t::Context & ctx, const class Properties & args);
+ void stopProcess(Parser_t::Context & ctx, const class Properties & args);
+ void showProcess(Parser_t::Context & ctx, const class Properties & args);
+ void listProcesses(Parser_t::Context & ctx, const class Properties & args);
+};
+
+class CPCDAPIService : public SocketServer::Service {
+ class CPCD & m_cpcd;
+public:
+ CPCDAPIService(class CPCD & cpcd) : m_cpcd(cpcd) {}
+
+ CPCDAPISession * newSession(NDB_SOCKET_TYPE theSock){
+ return new CPCDAPISession(theSock, m_cpcd);
+ }
+};
+
+#endif
diff --git a/storage/ndb/src/cw/cpcd/CPCD.cpp b/storage/ndb/src/cw/cpcd/CPCD.cpp
new file mode 100644
index 00000000000..69a7b840528
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/CPCD.cpp
@@ -0,0 +1,435 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <ndb_global.h>
+#include <NdbOut.hpp>
+
+#include "APIService.hpp"
+#include "CPCD.hpp"
+#include <NdbMutex.h>
+
+#include "common.hpp"
+
+extern const ParserRow<CPCDAPISession> commands[];
+
+
+CPCD::CPCD() {
+ loadingProcessList = false;
+ m_processes.clear();
+ m_monitor = NULL;
+ m_monitor = new Monitor(this);
+ m_procfile = "ndb_cpcd.db";
+}
+
+CPCD::~CPCD() {
+ if(m_monitor != NULL) {
+ delete m_monitor;
+ m_monitor = NULL;
+ }
+}
+
+int
+CPCD::findUniqueId() {
+ int id;
+ bool ok = false;
+ m_processes.lock();
+
+ while(!ok) {
+ ok = true;
+ id = random() % 8192; /* Don't want so big numbers */
+
+ if(id == 0)
+ ok = false;
+
+ for(size_t i = 0; i<m_processes.size(); i++) {
+ if(m_processes[i]->m_id == id)
+ ok = false;
+ }
+ }
+ m_processes.unlock();
+ return id;
+}
+
+bool
+CPCD::defineProcess(RequestStatus * rs, Process * arg){
+ if(arg->m_id == -1)
+ arg->m_id = findUniqueId();
+
+ Guard tmp(m_processes);
+
+ for(size_t i = 0; i<m_processes.size(); i++) {
+ Process * proc = m_processes[i];
+
+ if((strcmp(arg->m_name.c_str(), proc->m_name.c_str()) == 0) &&
+ (strcmp(arg->m_group.c_str(), proc->m_group.c_str()) == 0)) {
+ /* Identical names in the same group */
+ rs->err(AlreadyExists, "Name already exists");
+ return false;
+ }
+
+ if(arg->m_id == proc->m_id) {
+ /* Identical ID numbers */
+ rs->err(AlreadyExists, "Id already exists");
+ return false;
+ }
+ }
+
+ m_processes.push_back(arg, false);
+
+ notifyChanges();
+ report(arg->m_id, CPCEvent::ET_PROC_USER_DEFINE);
+
+ return true;
+}
+
+bool
+CPCD::undefineProcess(CPCD::RequestStatus *rs, int id) {
+
+ Guard tmp(m_processes);
+
+ Process * proc = 0;
+ size_t i;
+ for(i = 0; i < m_processes.size(); i++) {
+ if(m_processes[i]->m_id == id) {
+ proc = m_processes[i];
+ break;
+ }
+ }
+
+ if(proc == 0){
+ rs->err(NotExists, "No such process");
+ return false;
+ }
+
+ switch(proc->m_status){
+ case RUNNING:
+ case STOPPED:
+ case STOPPING:
+ case STARTING:
+ proc->stop();
+ m_processes.erase(i, false /* Already locked */);
+ }
+
+
+ notifyChanges();
+
+ report(id, CPCEvent::ET_PROC_USER_UNDEFINE);
+
+ return true;
+}
+
+bool
+CPCD::startProcess(CPCD::RequestStatus *rs, int id) {
+
+ Process * proc = 0;
+ {
+
+ Guard tmp(m_processes);
+
+ for(size_t i = 0; i < m_processes.size(); i++) {
+ if(m_processes[i]->m_id == id) {
+ proc = m_processes[i];
+ break;
+ }
+ }
+
+ if(proc == 0){
+ rs->err(NotExists, "No such process");
+ return false;
+ }
+
+ switch(proc->m_status){
+ case STOPPED:
+ proc->m_status = STARTING;
+ if(proc->start() != 0){
+ rs->err(Error, "Failed to start");
+ return false;
+ }
+ break;
+ case STARTING:
+ rs->err(Error, "Already starting");
+ return false;
+ case RUNNING:
+ rs->err(Error, "Already started");
+ return false;
+ case STOPPING:
+ rs->err(Error, "Currently stopping");
+ return false;
+ }
+
+ notifyChanges();
+ }
+ report(id, CPCEvent::ET_PROC_USER_START);
+
+ return true;
+}
+
+bool
+CPCD::stopProcess(CPCD::RequestStatus *rs, int id) {
+
+ Guard tmp(m_processes);
+
+ Process * proc = 0;
+ for(size_t i = 0; i < m_processes.size(); i++) {
+ if(m_processes[i]->m_id == id) {
+ proc = m_processes[i];
+ break;
+ }
+ }
+
+ if(proc == 0){
+ rs->err(NotExists, "No such process");
+ return false;
+ }
+
+ switch(proc->m_status){
+ case STARTING:
+ case RUNNING:
+ proc->stop();
+ break;
+ case STOPPED:
+ rs->err(AlreadyStopped, "Already stopped");
+ return false;
+ break;
+ case STOPPING:
+ rs->err(Error, "Already stopping");
+ return false;
+ }
+
+ notifyChanges();
+
+ report(id, CPCEvent::ET_PROC_USER_START);
+
+ return true;
+}
+
+bool
+CPCD::notifyChanges() {
+ bool ret = true;
+ if(!loadingProcessList)
+ ret = saveProcessList();
+
+ m_monitor->signal();
+
+ return ret;
+}
+
+/* Must be called with m_processlist locked */
+bool
+CPCD::saveProcessList(){
+ char newfile[PATH_MAX+4];
+ char oldfile[PATH_MAX+4];
+ char curfile[PATH_MAX];
+ FILE *f;
+
+ /* Create the filenames that we will use later */
+ BaseString::snprintf(newfile, sizeof(newfile), "%s.new", m_procfile.c_str());
+ BaseString::snprintf(oldfile, sizeof(oldfile), "%s.old", m_procfile.c_str());
+ BaseString::snprintf(curfile, sizeof(curfile), "%s", m_procfile.c_str());
+
+ f = fopen(newfile, "w");
+
+ if(f == NULL) {
+ /* XXX What should be done here? */
+ logger.critical("Cannot open `%s': %s\n", newfile, strerror(errno));
+ return false;
+ }
+
+ for(size_t i = 0; i<m_processes.size(); i++){
+ m_processes[i]->print(f);
+ fprintf(f, "\n");
+
+ if(m_processes[i]->m_processType == TEMPORARY){
+ /**
+ * Interactive process should never be "restarted" on cpcd restart
+ */
+ continue;
+ }
+
+ if(m_processes[i]->m_status == RUNNING ||
+ m_processes[i]->m_status == STARTING){
+ fprintf(f, "start process\nid: %d\n\n", m_processes[i]->m_id);
+ }
+ }
+
+ fclose(f);
+ f = NULL;
+
+ /* This will probably only work on reasonably Unix-like systems. You have
+ * been warned...
+ *
+ * The motivation behind all this link()ing is that the daemon might
+ * crash right in the middle of updating the configuration file, and in
+ * that case we want to be sure that the old file is around until we are
+ * guaranteed that there is always at least one copy of either the old or
+ * the new configuration file left.
+ */
+
+ /* Remove an old config file if it exists */
+ unlink(oldfile);
+
+ if(link(curfile, oldfile) != 0) /* make a backup of the running config */
+ logger.error("Cannot rename '%s' -> '%s'", curfile, oldfile);
+ else {
+ if(unlink(curfile) != 0) { /* remove the running config file */
+ logger.critical("Cannot remove file '%s'", curfile);
+ return false;
+ }
+ }
+
+ if(link(newfile, curfile) != 0) { /* put the new config file in place */
+ printf("-->%d\n", __LINE__);
+
+ logger.critical("Cannot rename '%s' -> '%s': %s",
+ curfile, newfile, strerror(errno));
+ return false;
+ }
+
+ /* XXX Ideally we would fsync() the directory here, but I'm not sure if
+ * that actually works.
+ */
+
+ unlink(newfile); /* remove the temporary file */
+ unlink(oldfile); /* remove the old file */
+
+ logger.info("Process list saved as '%s'", curfile);
+
+ return true;
+}
+
+bool
+CPCD::loadProcessList(){
+ BaseString secondfile;
+ FILE *f;
+
+ loadingProcessList = true;
+
+ secondfile.assfmt("%s.new", m_procfile.c_str());
+
+ /* Try to open the config file */
+ f = fopen(m_procfile.c_str(), "r");
+
+ /* If it did not exist, try to open the backup. See the saveProcessList()
+ * method for an explanation why it is done this way.
+ */
+ if(f == NULL) {
+ f = fopen(secondfile.c_str(), "r");
+
+ if(f == NULL) {
+ /* XXX What to do here? */
+ logger.info("Configuration file `%s' not found",
+ m_procfile.c_str());
+ logger.info("Starting with empty configuration");
+ loadingProcessList = false;
+ return false;
+ } else {
+ logger.info("Configuration file `%s' missing",
+ m_procfile.c_str());
+ logger.info("Backup configuration file `%s' is used",
+ secondfile.c_str());
+ /* XXX Maybe we should just rename the backup file to the official
+ * name, and be done with it?
+ */
+ }
+ }
+
+ CPCDAPISession sess(f, *this);
+ sess.loadFile();
+ loadingProcessList = false;
+
+ size_t i;
+ Vector<int> temporary;
+ for(i = 0; i<m_processes.size(); i++){
+ Process * proc = m_processes[i];
+ proc->readPid();
+ if(proc->m_processType == TEMPORARY){
+ temporary.push_back(proc->m_id);
+ }
+ }
+
+ for(i = 0; i<temporary.size(); i++){
+ RequestStatus rs;
+ undefineProcess(&rs, temporary[i]);
+ }
+
+ /* Don't call notifyChanges here, as that would save the file we just
+ loaded */
+ m_monitor->signal();
+ return true;
+}
+
+MutexVector<CPCD::Process *> *
+CPCD::getProcessList() {
+ return &m_processes;
+}
+
+void
+CPCD::RequestStatus::err(enum RequestStatusCode status, const char *msg) {
+ m_status = status;
+ BaseString::snprintf(m_errorstring, sizeof(m_errorstring), "%s", msg);
+}
+
+#if 0
+void
+CPCD::sigchild(int pid){
+ m_processes.lock();
+ for(size_t i = 0; i<m_processes.size(); i++){
+ if(m_processes[i].m_pid == pid){
+ }
+ }
+ wait(pid, 0, 0);
+}
+#endif
+
+ /** Register event subscriber */
+void
+CPCD::do_register(EventSubscriber * sub){
+ m_subscribers.lock();
+ m_subscribers.push_back(sub, false);
+ m_subscribers.unlock();
+}
+
+EventSubscriber*
+CPCD::do_unregister(EventSubscriber * sub){
+ m_subscribers.lock();
+
+ for(size_t i = 0; i<m_subscribers.size(); i++){
+ if(m_subscribers[i] == sub){
+ m_subscribers.erase(i);
+ m_subscribers.unlock();
+ return sub;
+ }
+ }
+
+ m_subscribers.unlock();
+ return 0;
+}
+
+void
+CPCD::report(int id, CPCEvent::EventType t){
+ CPCEvent e;
+ e.m_time = time(0);
+ e.m_proc = id;
+ e.m_type = t;
+ m_subscribers.lock();
+ for(size_t i = 0; i<m_subscribers.size(); i++){
+ (* m_subscribers[i]).report(e);
+ }
+ m_subscribers.unlock();
+}
+
+template class MutexVector<EventSubscriber*>;
diff --git a/storage/ndb/src/cw/cpcd/CPCD.hpp b/storage/ndb/src/cw/cpcd/CPCD.hpp
new file mode 100644
index 00000000000..a5c0bef1dac
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/CPCD.hpp
@@ -0,0 +1,382 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef CPCD_HPP
+#define CPCD_HPP
+
+#include <Vector.hpp>
+#include <Properties.hpp>
+#include <NdbOut.hpp>
+#include <NdbThread.h>
+#include <NdbCondition.h>
+#include <BaseString.hpp>
+
+/* XXX Need to figure out how to do this for non-Unix systems */
+#define CPCD_DEFAULT_WORK_DIR "/var/run/ndb_cpcd"
+#define CPCD_DEFAULT_PROC_FILE "ndb_cpcd.conf"
+#define CPCD_DEFAULT_TCP_PORT 1234
+#define CPCD_DEFAULT_POLLING_INTERVAL 5 /* seconds */
+#define CPCD_DEFAULT_CONFIG_FILE "/etc/ndb_cpcd.conf"
+
+enum ProcessStatus {
+ STOPPED = 0,
+ STARTING = 1,
+ RUNNING = 2,
+ STOPPING = 3
+};
+
+enum ProcessType {
+ PERMANENT = 0,
+ TEMPORARY = 1
+};
+
+struct CPCEvent {
+ enum EventType {
+ ET_USER_CONNECT,
+ ET_USER_DISCONNECT,
+
+ ET_PROC_USER_DEFINE, // Defined proc
+ ET_PROC_USER_UNDEFINE, // Undefined proc
+ ET_PROC_USER_START, // Proc ordered to start
+ ET_PROC_USER_STOP, // Proc ordered to stop
+ ET_PROC_STATE_RUNNING, // exec returned(?) ok
+ ET_PROC_STATE_STOPPED // detected that proc is ! running
+ };
+
+ int m_proc;
+ time_t m_time;
+ EventType m_type;
+};
+
+struct EventSubscriber {
+ virtual void report(const CPCEvent &) = 0;
+};
+
+/**
+ * @brief Error codes for CPCD requests
+ */
+enum RequestStatusCode {
+ OK = 0, ///< Everything OK
+ Error = 1, ///< Generic error
+ AlreadyExists = 2, ///< Entry already exists in list
+ NotExists = 3, ///< Entry does not exist in list
+ AlreadyStopped = 4
+};
+
+/**
+ * @class CPCD
+ * @brief Manages processes, letting them be controlled with a TCP connection.
+ *
+ * The class implementing the Cluster Process Control Daemon
+ */
+class CPCD {
+public:
+ /** @brief Describes the status of a client request */
+ class RequestStatus {
+ public:
+ /** @brief Constructs an empty RequestStatus */
+ RequestStatus() { m_status = OK; m_errorstring[0] = '\0'; };
+
+ /** @brief Sets an errorcode and a printable message */
+ void err(enum RequestStatusCode, const char *);
+
+ /** @brief Returns the error message */
+ char *getErrMsg() { return m_errorstring; };
+
+ /** @brief Returns the error code */
+ enum RequestStatusCode getStatus() { return m_status; };
+ private:
+ enum RequestStatusCode m_status;
+ char m_errorstring[256];
+ };
+ /**
+ * @brief Manages a process
+ */
+ class Process {
+ int m_pid;
+ public:
+ /**
+ * @brief Constructs and empty Process
+ */
+ Process(const Properties & props, class CPCD *cpcd);
+ /**
+ * @brief Monitors the process
+ *
+ * The process is started or stopped as needed.
+ */
+ void monitor();
+
+ /**
+ * @brief Checks if the process is running or not
+ *
+ * @return
+ * - true if the process is running,
+ * - false if the process is not running
+ */
+ bool isRunning();
+
+ /** @brief Starts the process */
+ int start();
+
+ /** @brief Stops the process */
+ void stop();
+
+ /**
+ * @brief Reads the pid from stable storage
+ *
+ * @return The pid number
+ */
+ int readPid();
+
+ /**
+ * @brief Writes the pid from stable storage
+ *
+ * @return
+ * - 0 if successful
+ - -1 and sets errno if an error occured
+ */
+ int writePid(int pid);
+
+ /**
+ * @brief Prints a textual description of the process on a file
+ */
+ void print(FILE *);
+
+ /** Id number of the Process.
+ *
+ * @note This is not the same as a pid. This number is used in the
+ * protocol, and will not be changed if a processes is restarted.
+ */
+ int m_id;
+
+ /** @brief The name shown to the user */
+ BaseString m_name;
+
+ /** @brief Used to group a number of processes */
+ BaseString m_group;
+
+ /** @brief Environment variables
+ *
+ * Environmentvariables to add for the process.
+ *
+ * @note
+ * - The environment cpcd started with is preserved
+ * - There is no way to delete variables
+ */
+ BaseString m_env;
+
+ /** @brief Path to the binary to run */
+ BaseString m_path;
+
+ /** @brief Arguments to the process.
+ *
+ * @note
+ * - This includes argv[0].
+ * - If no argv[0] is given, argv[0] will be set to m_path.
+ */
+ BaseString m_args;
+
+ /**
+ * @brief Type of process
+ *
+ * Either set to "interactive" or "permanent".
+ */
+ BaseString m_type;
+ ProcessType m_processType;
+
+ /**
+ * @brief Working directory
+ *
+ * Working directory the process will start in.
+ */
+ BaseString m_cwd;
+
+ /**
+ * @brief Owner of the process.
+ *
+ * @note This will not affect the process' uid or gid;
+ * it is only used for managemental purposes.
+ * @see m_runas
+ */
+ BaseString m_owner;
+
+ /**
+ * @bried Run as
+ * @note This affects uid
+ * @see m_owner
+ */
+ BaseString m_runas;
+
+ /**
+ * @brief redirection for stdin
+ */
+ BaseString m_stdin;
+
+ /**
+ * @brief redirection for stdout
+ */
+ BaseString m_stdout;
+
+ /**
+ * @brief redirection for stderr
+ */
+ BaseString m_stderr;
+
+ /** @brief Status of the process */
+ enum ProcessStatus m_status;
+
+ /**
+ * @brief ulimits for process
+ * @desc Format c:unlimited d:0 ...
+ */
+ BaseString m_ulimit;
+ private:
+ class CPCD *m_cpcd;
+ void do_exec();
+ };
+
+ /**
+ * @brief Starts and stops processes as needed
+ *
+ * At a specified interval (default 5 seconds) calls the monitor function
+ * of all the processes in the CPCDs list, causing the to start or
+ * stop, depending on the configuration.
+ */
+ class Monitor {
+ public:
+ /** Creates a new CPCD::Monitor object, connected to the specified
+ * CPCD.
+ * A new thread will be created, which will poll the processes of
+ * the CPCD at the specifed interval.
+ */
+ Monitor(CPCD *cpcd, int poll = CPCD_DEFAULT_POLLING_INTERVAL);
+
+ /** Stops the monitor, but does not stop the processes */
+ ~Monitor();
+
+ /** Runs the monitor thread. */
+ void run();
+
+ /** Signals configuration changes to the monitor thread, causing it to
+ * do the check without waiting for the timeout */
+ void signal();
+ private:
+ class CPCD *m_cpcd;
+ struct NdbThread *m_monitorThread;
+ bool m_monitorThreadQuitFlag;
+ struct NdbCondition *m_changeCondition;
+ NdbMutex *m_changeMutex;
+ int m_pollingInterval; /* seconds */
+ };
+
+ /** @brief Constructs a CPCD object */
+ CPCD();
+
+ /**
+ * @brief Destroys a CPCD object,
+ * but does not stop the processes it manages
+ */
+ ~CPCD();
+
+ /** Adds a Process to the CPCDs list of managed Processes.
+ *
+ * @note The process will not be started until it is explicitly
+ * marked as running with CPCD::startProcess().
+ *
+ * @return
+ * - true if the addition was successful,
+ * - false if not
+ * - RequestStatus will be filled in with a suitable error
+ * if an error occured.
+ */
+ bool defineProcess(RequestStatus *rs, Process * arg);
+
+ /** Removes a Process from the CPCD.
+ *
+ * @note A Process that is running cannot be removed.
+ *
+ * @return
+ * - true if the removal was successful,
+ * - false if not
+ * - The RequestStatus will be filled in with a suitable error
+ * if an error occured.
+ */
+ bool undefineProcess(RequestStatus *rs, int id);
+
+ /** Marks a Process for starting.
+ *
+ * @note The fact that a process has started does not mean it will actually
+ * start properly. This command only makes sure the CPCD will
+ * try to start it.
+ *
+ * @return
+ * - true if the marking was successful
+ * - false if not
+ * - RequestStatus will be filled in with a suitable error
+ * if an error occured.
+ */
+ bool startProcess(RequestStatus *rs, int id);
+
+ /** Marks a Process for stopping.
+ *
+ * @return
+ * - true if the marking was successful
+ * - false if not
+ * - The RequestStatus will be filled in with a suitable error
+ * if an error occured.
+ */
+ bool stopProcess(RequestStatus *rs, int id);
+
+ /** Generates a list of processes, and sends them to the CPCD client */
+ bool listProcesses(RequestStatus *rs, MutexVector<const char *> &);
+
+ /** Set to true while the CPCD is reading the configuration file */
+ bool loadingProcessList;
+
+ /** Saves the list of Processes and their status to the configuration file.
+ * Called whenever the configuration is changed.
+ */
+ bool saveProcessList();
+
+ /** Loads the list of Processes and their status from the configuration
+ * file.
+ * @note This function should only be called when the CPCD is starting,
+ * calling it at other times will cause unspecified behaviour.
+ */
+ bool loadProcessList();
+
+ /** Returns the list of processes */
+ MutexVector<Process *> *getProcessList();
+
+ /** The list of processes. Should not be used directly */
+ MutexVector<Process *> m_processes;
+
+ /** Register event subscriber */
+ void do_register(EventSubscriber * sub);
+ EventSubscriber* do_unregister(EventSubscriber * sub);
+
+private:
+ friend class Process;
+ bool notifyChanges();
+ int findUniqueId();
+ BaseString m_procfile;
+ Monitor *m_monitor;
+
+ void report(int id, CPCEvent::EventType);
+ MutexVector<EventSubscriber *> m_subscribers;
+};
+
+#endif
diff --git a/storage/ndb/src/cw/cpcd/Makefile.am b/storage/ndb/src/cw/cpcd/Makefile.am
new file mode 100644
index 00000000000..75f557b2af7
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/Makefile.am
@@ -0,0 +1,20 @@
+
+ndbbin_PROGRAMS = ndb_cpcd
+
+ndb_cpcd_SOURCES = main.cpp CPCD.cpp Process.cpp APIService.cpp Monitor.cpp common.cpp
+
+LDADD_LOC = \
+ $(top_builddir)/ndb/src/libndbclient.la \
+ $(top_builddir)/dbug/libdbug.a \
+ $(top_builddir)/mysys/libmysys.a \
+ $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
+
+include $(top_srcdir)/ndb/config/common.mk.am
+include $(top_srcdir)/ndb/config/type_util.mk.am
+
+ndb_cpcd_LDFLAGS = @ndb_bin_am_ldflags@
+
+# Don't update the files from bitkeeper
+%::SCCS/s.%
+
+windoze-dsp:
diff --git a/storage/ndb/src/cw/cpcd/Monitor.cpp b/storage/ndb/src/cw/cpcd/Monitor.cpp
new file mode 100644
index 00000000000..141de926d4d
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/Monitor.cpp
@@ -0,0 +1,79 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include <NdbThread.h>
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+
+#include "CPCD.hpp"
+#include "common.hpp"
+
+static void *
+monitor_thread_create_wrapper(void * arg) {
+ CPCD::Monitor *mon = (CPCD::Monitor *)arg;
+ mon->run();
+ return NULL;
+}
+
+CPCD::Monitor::Monitor(CPCD *cpcd, int poll) {
+ m_cpcd = cpcd;
+ m_pollingInterval = poll;
+ m_changeCondition = NdbCondition_Create();
+ m_changeMutex = NdbMutex_Create();
+ m_monitorThread = NdbThread_Create(monitor_thread_create_wrapper,
+ (NDB_THREAD_ARG*) this,
+ 32768,
+ "ndb_cpcd_monitor",
+ NDB_THREAD_PRIO_MEAN);
+ m_monitorThreadQuitFlag = false;
+}
+
+CPCD::Monitor::~Monitor() {
+ NdbThread_Destroy(&m_monitorThread);
+ NdbCondition_Destroy(m_changeCondition);
+ NdbMutex_Destroy(m_changeMutex);
+}
+
+void
+CPCD::Monitor::run() {
+ while(1) {
+ NdbMutex_Lock(m_changeMutex);
+ NdbCondition_WaitTimeout(m_changeCondition,
+ m_changeMutex,
+ m_pollingInterval * 1000);
+
+ MutexVector<CPCD::Process *> &proc = *m_cpcd->getProcessList();
+
+ proc.lock();
+
+ for(size_t i = 0; i < proc.size(); i++) {
+ proc[i]->monitor();
+ }
+
+ proc.unlock();
+
+ NdbMutex_Unlock(m_changeMutex);
+ }
+}
+
+void
+CPCD::Monitor::signal() {
+ NdbCondition_Signal(m_changeCondition);
+}
+
+template class MutexVector<CPCD::Process*>;
diff --git a/storage/ndb/src/cw/cpcd/Process.cpp b/storage/ndb/src/cw/cpcd/Process.cpp
new file mode 100644
index 00000000000..2509f34e882
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/Process.cpp
@@ -0,0 +1,479 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include <BaseString.hpp>
+#include <InputStream.hpp>
+
+#include "common.hpp"
+#include "CPCD.hpp"
+
+#include <pwd.h>
+#ifdef HAVE_GETRLIMIT
+#include <sys/resource.h>
+#endif
+
+void
+CPCD::Process::print(FILE * f){
+ fprintf(f, "define process\n");
+ fprintf(f, "id: %d\n", m_id);
+ fprintf(f, "name: %s\n", m_name.c_str() ? m_name.c_str() : "");
+ fprintf(f, "group: %s\n", m_group.c_str() ? m_group.c_str() : "");
+ fprintf(f, "env: %s\n", m_env.c_str() ? m_env.c_str() : "");
+ fprintf(f, "path: %s\n", m_path.c_str() ? m_path.c_str() : "");
+ fprintf(f, "args: %s\n", m_args.c_str() ? m_args.c_str() : "");
+ fprintf(f, "type: %s\n", m_type.c_str() ? m_type.c_str() : "");
+ fprintf(f, "cwd: %s\n", m_cwd.c_str() ? m_cwd.c_str() : "");
+ fprintf(f, "owner: %s\n", m_owner.c_str() ? m_owner.c_str() : "");
+ fprintf(f, "runas: %s\n", m_runas.c_str() ? m_runas.c_str() : "");
+ fprintf(f, "stdin: %s\n", m_stdin.c_str() ? m_stdin.c_str() : "");
+ fprintf(f, "stdout: %s\n", m_stdout.c_str() ? m_stdout.c_str() : "");
+ fprintf(f, "stderr: %s\n", m_stderr.c_str() ? m_stderr.c_str() : "");
+ fprintf(f, "ulimit: %s\n", m_ulimit.c_str() ? m_ulimit.c_str() : "");
+}
+
+CPCD::Process::Process(const Properties & props, class CPCD *cpcd) {
+ m_id = -1;
+ m_pid = -1;
+ props.get("id", (Uint32 *) &m_id);
+ props.get("name", m_name);
+ props.get("group", m_group);
+ props.get("env", m_env);
+ props.get("path", m_path);
+ props.get("args", m_args);
+ props.get("cwd", m_cwd);
+ props.get("owner", m_owner);
+ props.get("type", m_type);
+ props.get("runas", m_runas);
+
+ props.get("stdin", m_stdin);
+ props.get("stdout", m_stdout);
+ props.get("stderr", m_stderr);
+ props.get("ulimit", m_ulimit);
+ m_status = STOPPED;
+
+ if(strcasecmp(m_type.c_str(), "temporary") == 0){
+ m_processType = TEMPORARY;
+ } else {
+ m_processType = PERMANENT;
+ }
+
+ m_cpcd = cpcd;
+}
+
+void
+CPCD::Process::monitor() {
+ switch(m_status) {
+ case STARTING:
+ break;
+ case RUNNING:
+ if(!isRunning()){
+ m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_STOPPED);
+ if(m_processType == TEMPORARY){
+ m_status = STOPPED;
+ } else {
+ start();
+ }
+ }
+ break;
+ case STOPPED:
+ assert(!isRunning());
+ break;
+ case STOPPING:
+ break;
+ }
+}
+
+bool
+CPCD::Process::isRunning() {
+
+ if(m_pid <= 1){
+ //logger.critical("isRunning(%d) invalid pid: %d", m_id, m_pid);
+ return false;
+ }
+ /* Check if there actually exists a process with such a pid */
+ errno = 0;
+ int s = kill((pid_t)-m_pid, 0); /* Sending "signal" 0 to a process only
+ * checkes if the process actually exists */
+ if(s != 0) {
+ switch(errno) {
+ case EPERM:
+ logger.critical("Not enough privileges to control pid %d\n", m_pid);
+ break;
+ case ESRCH:
+ /* The pid in the file does not exist, which probably means that it
+ has died, or the file contains garbage for some other reason */
+ break;
+ default:
+ logger.critical("Cannot not control pid %d: %s\n", m_pid, strerror(errno));
+ break;
+ }
+ return false;
+ }
+ return true;
+}
+
+int
+CPCD::Process::readPid() {
+ if(m_pid != -1){
+ logger.critical("Reading pid while != -1(%d)", m_pid);
+ return m_pid;
+ }
+
+ char filename[PATH_MAX*2+1];
+ char buf[1024];
+ FILE *f;
+
+ memset(buf, 0, sizeof(buf));
+
+ BaseString::snprintf(filename, sizeof(filename), "%d", m_id);
+
+ f = fopen(filename, "r");
+
+ if(f == NULL){
+ return -1; /* File didn't exist */
+ }
+
+ errno = 0;
+ size_t r = fread(buf, 1, sizeof(buf), f);
+ fclose(f);
+ if(r > 0)
+ m_pid = strtol(buf, (char **)NULL, 0);
+
+ if(errno == 0){
+ return m_pid;
+ }
+
+ return -1;
+}
+
+int
+CPCD::Process::writePid(int pid) {
+ char tmpfilename[PATH_MAX+1+4+8];
+ char filename[PATH_MAX*2+1];
+ FILE *f;
+
+ BaseString::snprintf(tmpfilename, sizeof(tmpfilename), "tmp.XXXXXX");
+ BaseString::snprintf(filename, sizeof(filename), "%d", m_id);
+
+ int fd = mkstemp(tmpfilename);
+ if(fd < 0) {
+ logger.error("Cannot open `%s': %s\n", tmpfilename, strerror(errno));
+ return -1; /* Couldn't open file */
+ }
+
+ f = fdopen(fd, "w");
+
+ if(f == NULL) {
+ logger.error("Cannot open `%s': %s\n", tmpfilename, strerror(errno));
+ return -1; /* Couldn't open file */
+ }
+
+ fprintf(f, "%d", pid);
+ fclose(f);
+
+ if(rename(tmpfilename, filename) == -1){
+ logger.error("Unable to rename from %s to %s", tmpfilename, filename);
+ return -1;
+ }
+ return 0;
+}
+
+static void
+setup_environment(const char *env) {
+ char **p;
+ p = BaseString::argify("", env);
+ for(int i = 0; p[i] != NULL; i++){
+ /*int res = */ putenv(p[i]);
+ }
+}
+
+static
+int
+set_ulimit(const BaseString & pair){
+#ifdef HAVE_GETRLIMIT
+ errno = 0;
+ Vector<BaseString> list;
+ pair.split(list, ":");
+ if(list.size() != 2){
+ logger.error("Unable to process ulimit: split >%s< list.size()=%d",
+ pair.c_str(), list.size());
+ return -1;
+ }
+
+ int res;
+ rlim_t value = RLIM_INFINITY;
+ if(!(list[1].trim() == "unlimited")){
+ value = atoi(list[1].c_str());
+ }
+
+ struct rlimit rlp;
+#define _RLIMIT_FIX(x) { res = getrlimit(x,&rlp); if(!res){ rlp.rlim_cur = value; res = setrlimit(x, &rlp); }}
+
+ if(list[0].trim() == "c"){
+ _RLIMIT_FIX(RLIMIT_CORE);
+ } else if(list[0] == "d"){
+ _RLIMIT_FIX(RLIMIT_DATA);
+ } else if(list[0] == "f"){
+ _RLIMIT_FIX(RLIMIT_FSIZE);
+ } else if(list[0] == "n"){
+ _RLIMIT_FIX(RLIMIT_NOFILE);
+ } else if(list[0] == "s"){
+ _RLIMIT_FIX(RLIMIT_STACK);
+ } else if(list[0] == "t"){
+ _RLIMIT_FIX(RLIMIT_CPU);
+ } else {
+ res= -11;
+ errno = EINVAL;
+ }
+ if(res){
+ logger.error("Unable to process ulimit: %s res=%d error=%d(%s)",
+ pair.c_str(), res, errno, strerror(errno));
+ return -1;
+ }
+#endif
+ return 0;
+}
+
+void
+CPCD::Process::do_exec() {
+ size_t i;
+ setup_environment(m_env.c_str());
+
+ char **argv = BaseString::argify(m_path.c_str(), m_args.c_str());
+
+ if(strlen(m_cwd.c_str()) > 0) {
+ int err = chdir(m_cwd.c_str());
+ if(err == -1) {
+ BaseString err;
+ logger.error("%s: %s\n", m_cwd.c_str(), strerror(errno));
+ _exit(1);
+ }
+ }
+
+ Vector<BaseString> ulimit;
+ m_ulimit.split(ulimit);
+ for(i = 0; i<ulimit.size(); i++){
+ if(ulimit[i].trim().length() > 0 && set_ulimit(ulimit[i]) != 0){
+ _exit(1);
+ }
+ }
+
+ int fd = open("/dev/null", O_RDWR, 0);
+ if(fd == -1) {
+ logger.error("Cannot open `/dev/null': %s\n", strerror(errno));
+ _exit(1);
+ }
+
+ BaseString * redirects[] = { &m_stdin, &m_stdout, &m_stderr };
+ int fds[3];
+ for(i = 0; i<3; i++){
+ if(redirects[i]->empty()){
+#ifndef DEBUG
+ dup2(fd, i);
+#endif
+ continue;
+ }
+
+ if((* redirects[i]) == "2>&1" && i == 2){
+ dup2(fds[1], 2);
+ continue;
+ }
+
+ /**
+ * Make file
+ */
+ int flags = 0;
+ int mode = S_IRUSR | S_IWUSR ;
+ if(i == 0){
+ flags |= O_RDONLY;
+ } else {
+ flags |= O_WRONLY | O_CREAT | O_APPEND;
+ }
+ int f = fds[i]= open(redirects[i]->c_str(), flags, mode);
+ if(f == -1){
+ logger.error("Cannot redirect %d to/from '%s' : %s\n", i,
+ redirects[i]->c_str(), strerror(errno));
+ _exit(1);
+ }
+ dup2(f, i);
+ }
+
+ /* Close all filedescriptors */
+ for(i = STDERR_FILENO+1; (int)i < getdtablesize(); i++)
+ close(i);
+
+ execv(m_path.c_str(), argv);
+ /* XXX If we reach this point, an error has occurred, but it's kind of hard
+ * to report it, because we've closed all files... So we should probably
+ * create a new logger here */
+ logger.error("Exec failed: %s\n", strerror(errno));
+ /* NOTREACHED */
+}
+
+int
+CPCD::Process::start() {
+ /* We need to fork() twice, so that the second child (grandchild?) can
+ * become a daemon. The original child then writes the pid file,
+ * so that the monitor knows the pid of the new process, and then
+ * exit()s. That way, the monitor process can pickup the pid, and
+ * the running process is a daemon.
+ *
+ * This is a bit tricky but has the following advantages:
+ * - the cpcd can die, and "reconnect" to the monitored clients
+ * without restarting them.
+ * - the cpcd does not have to wait() for the childs. init(1) will
+ * take care of that.
+ */
+ logger.info("Starting %d: %s", m_id, m_name.c_str());
+ m_status = STARTING;
+
+ int pid = -1;
+ switch(m_processType){
+ case TEMPORARY:{
+ /**
+ * Simple fork
+ * don't ignore child
+ */
+ switch(pid = fork()) {
+ case 0: /* Child */
+ setsid();
+ writePid(getpgrp());
+ if(runas(m_runas.c_str()) == 0){
+ signal(SIGCHLD, SIG_DFL);
+ do_exec();
+ }
+ _exit(1);
+ break;
+ case -1: /* Error */
+ logger.error("Cannot fork: %s\n", strerror(errno));
+ m_status = STOPPED;
+ return -1;
+ break;
+ default: /* Parent */
+ logger.debug("Started temporary %d : pid=%d", m_id, pid);
+ m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_RUNNING);
+ break;
+ }
+ break;
+ }
+ case PERMANENT:{
+ /**
+ * PERMANENT
+ */
+ switch(fork()) {
+ case 0: /* Child */
+ signal(SIGCHLD, SIG_IGN);
+ switch(pid = fork()) {
+ case 0: /* Child */
+ setsid();
+ writePid(getpgrp());
+ if(runas(m_runas.c_str()) != 0){
+ _exit(1);
+ }
+ signal(SIGCHLD, SIG_DFL);
+ do_exec();
+ _exit(1);
+ /* NOTREACHED */
+ break;
+ case -1: /* Error */
+ logger.error("Cannot fork: %s\n", strerror(errno));
+ writePid(-1);
+ _exit(1);
+ break;
+ default: /* Parent */
+ logger.debug("Started permanent %d : pid=%d", m_id, pid);
+ _exit(0);
+ break;
+ }
+ break;
+ case -1: /* Error */
+ logger.error("Cannot fork: %s\n", strerror(errno));
+ m_status = STOPPED;
+ return -1;
+ break;
+ default: /* Parent */
+ m_cpcd->report(m_id, CPCEvent::ET_PROC_STATE_RUNNING);
+ break;
+ }
+ break;
+ }
+ default:
+ logger.critical("Unknown process type");
+ return -1;
+ }
+
+ while(readPid() < 0){
+ sched_yield();
+ }
+
+ errno = 0;
+ pid_t pgid = getpgid(pid);
+
+ if(pgid != -1 && pgid != m_pid){
+ logger.error("pgid and m_pid don't match: %d %d (%d)", pgid, m_pid, pid);
+ }
+
+ if(isRunning()){
+ m_status = RUNNING;
+ return 0;
+ }
+ m_status = STOPPED;
+ return -1;
+}
+
+void
+CPCD::Process::stop() {
+
+ char filename[PATH_MAX*2+1];
+ BaseString::snprintf(filename, sizeof(filename), "%d", m_id);
+ unlink(filename);
+
+ if(m_pid <= 1){
+ logger.critical("Stopping process with bogus pid: %d id: %d",
+ m_pid, m_id);
+ return;
+ }
+ m_status = STOPPING;
+
+ errno = 0;
+ int ret = kill(-m_pid, SIGTERM);
+ switch(ret) {
+ case 0:
+ logger.debug("Sent SIGTERM to pid %d", (int)-m_pid);
+ break;
+ default:
+ logger.debug("kill pid: %d : %s", (int)-m_pid, strerror(errno));
+ break;
+ }
+
+ if(isRunning()){
+ errno = 0;
+ ret = kill(-m_pid, SIGKILL);
+ switch(ret) {
+ case 0:
+ logger.debug("Sent SIGKILL to pid %d", (int)-m_pid);
+ break;
+ default:
+ logger.debug("kill pid: %d : %s\n", (int)-m_pid, strerror(errno));
+ break;
+ }
+ }
+
+ m_pid = -1;
+ m_status = STOPPED;
+}
diff --git a/storage/ndb/src/cw/cpcd/common.cpp b/storage/ndb/src/cw/cpcd/common.cpp
new file mode 100644
index 00000000000..53c0e4d5a64
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/common.cpp
@@ -0,0 +1,98 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h>
+
+#include "common.hpp"
+#include <logger/Logger.hpp>
+#include <pwd.h>
+
+#include <Properties.hpp>
+#include <BaseString.hpp>
+
+int debug = 0;
+
+Logger logger;
+
+int
+runas(const char * user){
+ if(user == 0 || strlen(user) == 0){
+ return 0;
+ }
+ struct passwd * pw = getpwnam(user);
+ if(pw == 0){
+ logger.error("Can't find user to %s", user);
+ return -1;
+ }
+ uid_t uid = pw->pw_uid;
+ gid_t gid = pw->pw_gid;
+ int res = setgid(gid);
+ if(res != 0){
+ logger.error("Can't change group to %s(%d)", user, gid);
+ return res;
+ }
+
+ res = setuid(uid);
+ if(res != 0){
+ logger.error("Can't change user to %s(%d)", user, uid);
+ }
+ return res;
+}
+
+int
+insert(const char * pair, Properties & p){
+ BaseString tmp(pair);
+
+ tmp.trim(" \t\n\r");
+
+ Vector<BaseString> split;
+ tmp.split(split, ":=", 2);
+
+ if(split.size() != 2)
+ return -1;
+
+ p.put(split[0].trim().c_str(), split[1].trim().c_str());
+
+ return 0;
+}
+
+int
+insert_file(FILE * f, class Properties& p, bool break_on_empty){
+ if(f == 0)
+ return -1;
+
+ while(!feof(f)){
+ char buf[1024];
+ fgets(buf, 1024, f);
+ BaseString tmp = buf;
+
+ if(tmp.length() > 0 && tmp.c_str()[0] == '#')
+ continue;
+
+ if(insert(tmp.c_str(), p) != 0 && break_on_empty)
+ break;
+ }
+
+ return 0;
+}
+
+int
+insert_file(const char * filename, class Properties& p){
+ FILE * f = fopen(filename, "r");
+ int res = insert_file(f, p);
+ if(f) fclose(f);
+ return res;
+}
diff --git a/storage/ndb/src/cw/cpcd/common.hpp b/storage/ndb/src/cw/cpcd/common.hpp
new file mode 100644
index 00000000000..4f5f702762f
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/common.hpp
@@ -0,0 +1,36 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef __CPCD_COMMON_HPP_INCLUDED__
+#define __CPCD_COMMON_HPP_INCLUDED__
+
+#include <ndb_global.h>
+#include <logger/Logger.hpp>
+#if 0
+#include <getarg.h>
+#endif
+
+extern int debug;
+
+extern Logger logger;
+
+int runas(const char * user);
+int insert(const char * pair, class Properties & p);
+
+int insert_file(const char * filename, class Properties&);
+int insert_file(FILE *, class Properties&, bool break_on_empty = false);
+
+#endif /* ! __CPCD_COMMON_HPP_INCLUDED__ */
diff --git a/storage/ndb/src/cw/cpcd/main.cpp b/storage/ndb/src/cw/cpcd/main.cpp
new file mode 100644
index 00000000000..9bbd5e484d4
--- /dev/null
+++ b/storage/ndb/src/cw/cpcd/main.cpp
@@ -0,0 +1,187 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <ndb_global.h> /* Needed for mkdir(2) */
+#include <my_sys.h>
+#include <my_getopt.h>
+#include <mysql_version.h>
+#include <ndb_version.h>
+
+#include "CPCD.hpp"
+#include "APIService.hpp"
+#include <NdbMain.h>
+#include <NdbSleep.h>
+#include <BaseString.hpp>
+#include <logger/Logger.hpp>
+#include <logger/FileLogHandler.hpp>
+#include <logger/SysLogHandler.hpp>
+
+#include "common.hpp"
+
+static const char *work_dir = CPCD_DEFAULT_WORK_DIR;
+static short unsigned int port;
+static int use_syslog;
+static const char *logfile = NULL;
+static const char *config_file = CPCD_DEFAULT_CONFIG_FILE;
+static const char *user = 0;
+
+static struct my_option my_long_options[] =
+{
+ { "work-dir", 'w', "Work directory",
+ (gptr*) &work_dir, (gptr*) &work_dir, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+ { "port", 'p', "TCP port to listen on",
+ (gptr*) &port, (gptr*) &port, 0,
+ GET_INT, REQUIRED_ARG, CPCD_DEFAULT_TCP_PORT, 0, 0, 0, 0, 0 },
+ { "syslog", 'S', "Log events to syslog",
+ (gptr*) &use_syslog, (gptr*) &use_syslog, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "logfile", 'L', "File to log events to",
+ (gptr*) &logfile, (gptr*) &logfile, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+ { "debug", 'D', "Enable debug mode",
+ (gptr*) &debug, (gptr*) &debug, 0,
+ GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "config", 'c', "Config file",
+ (gptr*) &config_file, (gptr*) &config_file, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+ { "user", 'u', "Run as user",
+ (gptr*) &user, (gptr*) &user, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
+};
+
+static my_bool
+get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
+ char *argument)
+{
+ return 0;
+}
+
+static CPCD * g_cpcd = 0;
+#if 0
+extern "C" static void sig_child(int signo, siginfo_t*, void*);
+#endif
+
+const char *progname = "ndb_cpcd";
+
+int main(int argc, char** argv){
+ int save_argc= argc;
+ char** save_argv= argv;
+ const char *load_default_groups[]= { "ndb_cpcd",0 };
+ MY_INIT(argv[0]);
+
+ load_defaults("ndb_cpcd",load_default_groups,&argc,&argv);
+ if (handle_options(&argc, &argv, my_long_options, get_one_option)) {
+ my_print_help(my_long_options);
+ my_print_variables(my_long_options);
+ exit(1);
+ }
+
+ logger.setCategory(progname);
+ logger.enable(Logger::LL_ALL);
+
+ if(debug)
+ logger.createConsoleHandler();
+
+ if(user && runas(user) != 0){
+ logger.critical("Unable to change user: %s", user);
+ _exit(1);
+ }
+
+ if(logfile != NULL){
+ BaseString tmp;
+ if(logfile[0] != '/')
+ tmp.append(work_dir);
+ tmp.append(logfile);
+ logger.addHandler(new FileLogHandler(tmp.c_str()));
+ }
+
+ if(use_syslog)
+ logger.addHandler(new SysLogHandler());
+
+ logger.info("Starting");
+
+ CPCD cpcd;
+ g_cpcd = &cpcd;
+
+ /* XXX This will probably not work on !unix */
+ int err = mkdir(work_dir, S_IRWXU | S_IRGRP | S_IROTH);
+ if(err != 0) {
+ switch(errno) {
+ case EEXIST:
+ break;
+ default:
+ fprintf(stderr, "Cannot mkdir %s: %s\n", work_dir, strerror(errno));
+ exit(1);
+ }
+ }
+
+ if(strlen(work_dir) > 0){
+ logger.debug("Changing dir to '%s'", work_dir);
+ if((err = chdir(work_dir)) != 0){
+ fprintf(stderr, "Cannot chdir %s: %s\n", work_dir, strerror(errno));
+ exit(1);
+ }
+ }
+
+ cpcd.loadProcessList();
+
+ SocketServer * ss = new SocketServer();
+ CPCDAPIService * serv = new CPCDAPIService(cpcd);
+ if(!ss->setup(serv, &port)){
+ logger.critical("Cannot setup server: %s", strerror(errno));
+ sleep(1);
+ delete ss;
+ delete serv;
+ return 1;
+ }
+
+ ss->startServer();
+
+ {
+ signal(SIGPIPE, SIG_IGN);
+ signal(SIGCHLD, SIG_IGN);
+#if 0
+ struct sigaction act;
+ act.sa_handler = 0;
+ act.sa_sigaction = sig_child;
+ sigemptyset(&act.sa_mask);
+ act.sa_flags = SA_SIGINFO;
+ sigaction(SIGCHLD, &act, 0);
+#endif
+ }
+
+ logger.debug("Start completed");
+ while(true) NdbSleep_MilliSleep(1000);
+
+ delete ss;
+ return 0;
+}
+
+#if 0
+extern "C"
+void
+sig_child(int signo, siginfo_t* info, void*){
+ printf("signo: %d si_signo: %d si_errno: %d si_code: %d si_pid: %d\n",
+ signo,
+ info->si_signo,
+ info->si_errno,
+ info->si_code,
+ info->si_pid);
+
+}
+#endif