diff options
Diffstat (limited to 'ndb/src/cw/cpcd/CPCD.cpp')
-rw-r--r-- | ndb/src/cw/cpcd/CPCD.cpp | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/ndb/src/cw/cpcd/CPCD.cpp b/ndb/src/cw/cpcd/CPCD.cpp new file mode 100644 index 00000000000..8864ccf6e4e --- /dev/null +++ b/ndb/src/cw/cpcd/CPCD.cpp @@ -0,0 +1,435 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#include <string.h> +#include <NdbOut.hpp> +#include <NdbUnistd.h> +#include <NdbStdio.h> +#include <errno.h> + +#include "APIService.hpp" +#include "CPCD.hpp" +#include <NdbMutex.h> + +#include "common.hpp" + +extern const ParserRow<CPCDAPISession> commands[]; + + +CPCD::CPCD() { + loadingProcessList = false; + m_processes.clear(); + m_monitor = NULL; + m_monitor = new Monitor(this); + m_procfile = "ndb_cpcd.db"; +} + +CPCD::~CPCD() { + if(m_monitor != NULL) { + delete m_monitor; + m_monitor = NULL; + } +} + +int +CPCD::findUniqueId() { + int id; + bool ok = false; + m_processes.lock(); + + while(!ok) { + ok = true; + id = random() % 8192; /* Don't want so big numbers */ + + if(id == 0) + ok = false; + + for(size_t i = 0; i<m_processes.size(); i++) { + if(m_processes[i]->m_id == id) + ok = false; + } + } + m_processes.unlock(); + return id; +} + +bool +CPCD::defineProcess(RequestStatus * rs, Process * arg){ + if(arg->m_id == -1) + arg->m_id = findUniqueId(); + + Guard tmp(m_processes); + + for(size_t i = 0; i<m_processes.size(); i++) { + Process * proc = m_processes[i]; + + if((strcmp(arg->m_name.c_str(), proc->m_name.c_str()) == 0) && + (strcmp(arg->m_group.c_str(), proc->m_group.c_str()) == 0)) { + /* Identical names in the same group */ + rs->err(AlreadyExists, "Name already exists"); + return false; + } + + if(arg->m_id == proc->m_id) { + /* Identical ID numbers */ + rs->err(AlreadyExists, "Id already exists"); + return false; + } + } + + m_processes.push_back(arg, false); + + notifyChanges(); + report(arg->m_id, CPCEvent::ET_PROC_USER_DEFINE); + + return true; +} + +bool +CPCD::undefineProcess(CPCD::RequestStatus *rs, int id) { + + Guard tmp(m_processes); + + Process * proc = 0; + size_t i; + for(i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case RUNNING: + case STOPPED: + case STOPPING: + case STARTING: + proc->stop(); + m_processes.erase(i, false /* Already locked */); + } + + + notifyChanges(); + + report(id, CPCEvent::ET_PROC_USER_UNDEFINE); + + return true; +} + +bool +CPCD::startProcess(CPCD::RequestStatus *rs, int id) { + + Process * proc = 0; + { + + Guard tmp(m_processes); + + for(size_t i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case STOPPED: + proc->m_status = STARTING; + if(proc->start() != 0){ + rs->err(Error, "Failed to start"); + return false; + } + break; + case STARTING: + rs->err(Error, "Already starting"); + return false; + case RUNNING: + rs->err(Error, "Already started"); + return false; + case STOPPING: + rs->err(Error, "Currently stopping"); + return false; + } + + notifyChanges(); + } + report(id, CPCEvent::ET_PROC_USER_START); + + return true; +} + +bool +CPCD::stopProcess(CPCD::RequestStatus *rs, int id) { + + Guard tmp(m_processes); + + Process * proc = 0; + for(size_t i = 0; i < m_processes.size(); i++) { + if(m_processes[i]->m_id == id) { + proc = m_processes[i]; + break; + } + } + + if(proc == 0){ + rs->err(NotExists, "No such process"); + return false; + } + + switch(proc->m_status){ + case STARTING: + case RUNNING: + proc->stop(); + break; + case STOPPED: + rs->err(AlreadyStopped, "Already stopped"); + return false; + break; + case STOPPING: + rs->err(Error, "Already stopping"); + return false; + } + + notifyChanges(); + + report(id, CPCEvent::ET_PROC_USER_START); + + return true; +} + +bool +CPCD::notifyChanges() { + bool ret = true; + if(!loadingProcessList) + ret = saveProcessList(); + + m_monitor->signal(); + + return ret; +} + +/* Must be called with m_processlist locked */ +bool +CPCD::saveProcessList(){ + char newfile[PATH_MAX+4]; + char oldfile[PATH_MAX+4]; + char curfile[PATH_MAX]; + FILE *f; + + /* Create the filenames that we will use later */ + snprintf(newfile, sizeof(newfile), "%s.new", m_procfile.c_str()); + snprintf(oldfile, sizeof(oldfile), "%s.old", m_procfile.c_str()); + snprintf(curfile, sizeof(curfile), "%s", m_procfile.c_str()); + + f = fopen(newfile, "w"); + + if(f == NULL) { + /* XXX What should be done here? */ + logger.critical("Cannot open `%s': %s\n", newfile, strerror(errno)); + return false; + } + + for(size_t i = 0; i<m_processes.size(); i++){ + m_processes[i]->print(f); + fprintf(f, "\n"); + + if(m_processes[i]->m_processType == TEMPORARY){ + /** + * Interactive process should never be "restarted" on cpcd restart + */ + continue; + } + + if(m_processes[i]->m_status == RUNNING || + m_processes[i]->m_status == STARTING){ + fprintf(f, "start process\nid: %d\n\n", m_processes[i]->m_id); + } + } + + fclose(f); + f = NULL; + + /* This will probably only work on reasonably Unix-like systems. You have + * been warned... + * + * The motivation behind all this link()ing is that the daemon might + * crash right in the middle of updating the configuration file, and in + * that case we want to be sure that the old file is around until we are + * guaranteed that there is always at least one copy of either the old or + * the new configuration file left. + */ + + /* Remove an old config file if it exists */ + unlink(oldfile); + + if(link(curfile, oldfile) != 0) /* make a backup of the running config */ + logger.error("Cannot rename '%s' -> '%s'", curfile, oldfile); + else { + if(unlink(curfile) != 0) { /* remove the running config file */ + logger.critical("Cannot remove file '%s'", curfile); + return false; + } + } + + if(link(newfile, curfile) != 0) { /* put the new config file in place */ + printf("-->%d\n", __LINE__); + + logger.critical("Cannot rename '%s' -> '%s': %s", + curfile, newfile, strerror(errno)); + return false; + } + + /* XXX Ideally we would fsync() the directory here, but I'm not sure if + * that actually works. + */ + + unlink(newfile); /* remove the temporary file */ + unlink(oldfile); /* remove the old file */ + + logger.info("Process list saved as '%s'", curfile); + + return true; +} + +bool +CPCD::loadProcessList(){ + BaseString secondfile; + FILE *f; + + loadingProcessList = true; + + secondfile.assfmt("%s.new", m_procfile.c_str()); + + /* Try to open the config file */ + f = fopen(m_procfile.c_str(), "r"); + + /* If it did not exist, try to open the backup. See the saveProcessList() + * method for an explanation why it is done this way. + */ + if(f == NULL) { + f = fopen(secondfile.c_str(), "r"); + + if(f == NULL) { + /* XXX What to do here? */ + logger.info("Configuration file `%s' not found", + m_procfile.c_str()); + logger.info("Starting with empty configuration"); + loadingProcessList = false; + return false; + } else { + logger.info("Configuration file `%s' missing", + m_procfile.c_str()); + logger.info("Backup configuration file `%s' is used", + secondfile.c_str()); + /* XXX Maybe we should just rename the backup file to the official + * name, and be done with it? + */ + } + } + + CPCDAPISession sess(f, *this); + sess.loadFile(); + loadingProcessList = false; + + Vector<int> temporary; + for(size_t i = 0; i<m_processes.size(); i++){ + Process * proc = m_processes[i]; + proc->readPid(); + if(proc->m_processType == TEMPORARY){ + temporary.push_back(proc->m_id); + } + } + + for(size_t i = 0; i<temporary.size(); i++){ + RequestStatus rs; + undefineProcess(&rs, temporary[i]); + } + + /* Don't call notifyChanges here, as that would save the file we just + loaded */ + m_monitor->signal(); + return true; +} + +MutexVector<CPCD::Process *> * +CPCD::getProcessList() { + return &m_processes; +} + +void +CPCD::RequestStatus::err(enum RequestStatusCode status, char *msg) { + m_status = status; + snprintf(m_errorstring, sizeof(m_errorstring), "%s", msg); +} + +#if 0 +void +CPCD::sigchild(int pid){ + m_processes.lock(); + for(size_t i = 0; i<m_processes.size(); i++){ + if(m_processes[i].m_pid == pid){ + } + } + wait(pid, 0, 0); +} +#endif + + /** Register event subscriber */ +void +CPCD::do_register(EventSubscriber * sub){ + m_subscribers.lock(); + m_subscribers.push_back(sub, false); + m_subscribers.unlock(); +} + +EventSubscriber* +CPCD::do_unregister(EventSubscriber * sub){ + m_subscribers.lock(); + + for(size_t i = 0; i<m_subscribers.size(); i++){ + if(m_subscribers[i] == sub){ + m_subscribers.erase(i); + m_subscribers.unlock(); + return sub; + } + } + + m_subscribers.unlock(); + return 0; +} + +void +CPCD::report(int id, CPCEvent::EventType t){ + CPCEvent e; + e.m_time = time(0); + e.m_proc = id; + e.m_type = t; + m_subscribers.lock(); + for(size_t i = 0; i<m_subscribers.size(); i++){ + (* m_subscribers[i]).report(e); + } + m_subscribers.unlock(); +} |