diff options
Diffstat (limited to 'ndb/src/kernel/blocks/ndbfs')
19 files changed, 4786 insertions, 0 deletions
diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp b/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp new file mode 100644 index 00000000000..0e2aa4c6903 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp @@ -0,0 +1,1050 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +/** + * O_DIRECT + */ +#ifdef NDB_LINUX +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#endif + +#include "Error.hpp" +#include "AsyncFile.hpp" + +#include <ErrorHandlingMacros.hpp> +#include <kernel_types.h> +#include <string.h> +#include <NdbMem.h> +#include <NdbThread.h> +#include <signaldata/FsOpenReq.hpp> + +#include <sys/stat.h> +#include <fcntl.h> +#include <errno.h> + +#ifdef NDB_LINUX +// This is for pread and pwrite +#ifndef __USE_UNIX98 +#define __USE_UNIX98 +#endif +#endif + +#include <NdbUnistd.h> +#if defined NDB_WIN32 || defined NDB_OSE || defined NDB_SOFTOSE +#include <NdbStdio.h> +#else +// For readv and writev +#include <sys/uio.h> +#endif + +#ifndef NDB_WIN32 +#include <dirent.h> +#endif + +// Use this define if you want printouts from AsyncFile class +//#define DEBUG_ASYNCFILE + +#ifdef DEBUG_ASYNCFILE +#include <NdbOut.hpp> +#define DEBUG(x) x +#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f) +void printErrorAndFlags(Uint32 used_flags); +#else +#define DEBUG(x) +#define PRINT_ERRORANDFLAGS(f) +#endif + +// Define the size of the write buffer (for each thread) +#if defined NDB_SOFTOSE || defined NDB_OSE +#define WRITEBUFFERSIZE 65536 +#else +#define WRITEBUFFERSIZE 262144 +#endif + +const char *actionName[] = { + "open", + "close", + "closeRemove", + "read", + "readv", + "write", + "writev", + "writeSync", + "writevSync", + "sync", + "end" }; + +static int numAsyncFiles = 0; + +extern "C" void * runAsyncFile(void* arg) +{ + ((AsyncFile*)arg)->run(); + return (NULL); +} + +AsyncFile::AsyncFile() : + theFileName(), +#ifdef NDB_WIN32 + hFile(INVALID_HANDLE_VALUE), +#else + theFd(-1), +#endif + theReportTo(0), + theMemoryChannelPtr(NULL) +{ +} + +void +AsyncFile::doStart(const char * filesystemPath) { + theFileName.init(filesystemPath); + + // Stacksize for filesystem threads + // An 8k stack should be enough + const NDB_THREAD_STACKSIZE stackSize = 8192; + + char buf[16]; + numAsyncFiles++; + snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles); + + theStartMutexPtr = NdbMutex_Create(); + theStartConditionPtr = NdbCondition_Create(); + NdbMutex_Lock(theStartMutexPtr); + theStartFlag = false; + theThreadPtr = NdbThread_Create(runAsyncFile, + (void**)this, + stackSize, + (char*)&buf, + NDB_THREAD_PRIO_MEAN); + + NdbCondition_Wait(theStartConditionPtr, + theStartMutexPtr); + NdbMutex_Unlock(theStartMutexPtr); + NdbMutex_Destroy(theStartMutexPtr); + NdbCondition_Destroy(theStartConditionPtr); +} + +AsyncFile::~AsyncFile() +{ + void *status; + Request request; + request.action = Request::end; + theMemoryChannelPtr->writeChannel( &request ); + NdbThread_WaitFor(theThreadPtr, &status); + NdbThread_Destroy(&theThreadPtr); + delete theMemoryChannelPtr; +} + +void +AsyncFile::reportTo( MemoryChannel<Request> *reportTo ) +{ + theReportTo = reportTo; +} + +void AsyncFile::execute(Request* request) +{ + theMemoryChannelPtr->writeChannel( request ); +} + +void +AsyncFile::run() +{ + Request *request; + // Create theMemoryChannel in the thread that will wait for it + NdbMutex_Lock(theStartMutexPtr); + theMemoryChannelPtr = new MemoryChannel<Request>(); + theStartFlag = true; + // Create write buffer for bigger writes + theWriteBufferSize = WRITEBUFFERSIZE; + theWriteBuffer = (char *) NdbMem_Allocate(theWriteBufferSize); + NdbMutex_Unlock(theStartMutexPtr); + NdbCondition_Signal(theStartConditionPtr); + + if (!theWriteBuffer) { + DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer")); + return; + }//if + + while (1) { + request = theMemoryChannelPtr->readChannel(); + if (!request) { + DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile")); + endReq(); + return; + }//if + switch (request->action) { + case Request:: open: + openReq(request); + break; + case Request:: close: + closeReq(request); + break; + case Request:: closeRemove: + closeReq(request); + removeReq(request); + break; + case Request:: read: + readReq(request); + break; + case Request:: readv: + readvReq(request); + break; + case Request:: write: + writeReq(request); + break; + case Request:: writev: + writevReq(request); + break; + case Request:: writeSync: + writeReq(request); + syncReq(request); + break; + case Request:: writevSync: + writevReq(request); + syncReq(request); + break; + case Request:: sync: + syncReq(request); + break; + case Request:: append: + appendReq(request); + break; + case Request::rmrf: + rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory); + break; + case Request:: end: + closeReq(request); + endReq(); + return; + default: + THREAD_REQUIRE(false, "Using default switch in AsyncFile::run"); + break; + }//switch + theReportTo->writeChannel(request); + }//while +}//AsyncFile::run() + +extern bool Global_useO_SYNC; +extern bool Global_useO_DIRECT; +extern bool Global_unlinkO_CREAT; +extern Uint32 Global_syncFreq; + +void AsyncFile::openReq(Request* request) +{ + m_openedWithSync = false; + m_syncFrequency = 0; + + // for open.flags, see signal FSOPENREQ +#ifdef NDB_WIN32 + DWORD dwCreationDisposition; + DWORD dwDesiredAccess = 0; + DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE; + DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING; + const Uint32 flags = request->par.open.flags; + + // Convert file open flags from Solaris to Windows + if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){ + dwCreationDisposition = CREATE_ALWAYS; + } else if (flags & FsOpenReq::OM_TRUNCATE){ + dwCreationDisposition = TRUNCATE_EXISTING; + } else if (flags & FsOpenReq::OM_CREATE){ + dwCreationDisposition = CREATE_NEW; + } else { + dwCreationDisposition = OPEN_EXISTING; + } + + switch(flags & 3){ + case FsOpenReq::OM_READONLY: + dwDesiredAccess = GENERIC_READ; + break; + case FsOpenReq::OM_WRITEONLY: + dwDesiredAccess = GENERIC_WRITE; + break; + case FsOpenReq::OM_READWRITE: + dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; + break; + default: + request->error = 1000; + break; + return; + } + + hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, + 0, dwCreationDisposition, dwFlagsAndAttributes, 0); + + if(INVALID_HANDLE_VALUE == hFile) { + request->error = GetLastError(); + if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error)) + && (flags & FsOpenReq::OM_CREATE)) { + createDirectories(); + hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode, + 0, dwCreationDisposition, dwFlagsAndAttributes, 0); + + if(INVALID_HANDLE_VALUE == hFile) + request->error = GetLastError(); + else + request->error = 0; + + return; + } + } + else { + request->error = 0; + return; + } +#else + const Uint32 flags = request->par.open.flags; + Uint32 new_flags = 0; + + // Convert file open flags from Solaris to Liux + if(flags & FsOpenReq::OM_CREATE){ + new_flags |= O_CREAT; + } + + if(flags & FsOpenReq::OM_TRUNCATE){ +#if 0 + if(Global_unlinkO_CREAT){ + unlink(theFileName.c_str()); + } else +#endif + new_flags |= O_TRUNC; + } + + if(flags & FsOpenReq::OM_APPEND){ + new_flags |= O_APPEND; + } + + if(flags & FsOpenReq::OM_SYNC){ +#if 0 + if(Global_useO_SYNC){ + new_flags |= O_SYNC; + m_openedWithSync = true; + m_syncFrequency = 0; + } else { +#endif + m_openedWithSync = false; + m_syncCount = 0; + m_syncFrequency = Global_syncFreq; +#if 0 + } +#endif + } else { + m_openedWithSync = false; + m_syncFrequency = 0; + } + +#if 0 +#if NDB_LINUX + if(Global_useO_DIRECT){ + new_flags |= O_DIRECT; + } +#endif +#endif + + switch(flags & 0x3){ + case FsOpenReq::OM_READONLY: + new_flags |= O_RDONLY; + break; + case FsOpenReq::OM_WRITEONLY: + new_flags |= O_WRONLY; + break; + case FsOpenReq::OM_READWRITE: + new_flags |= O_RDWR; + break; + default: + request->error = 1000; + break; + return; + } + const int mode = S_IRUSR | S_IWUSR | S_IRGRP; + + if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { + PRINT_ERRORANDFLAGS(new_flags); + if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) { + createDirectories(); + if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { + PRINT_ERRORANDFLAGS(new_flags); + request->error = errno; + } + } else { + request->error = errno; + } + } +#endif +} + +int +AsyncFile::readBuffer(char * buf, size_t size, off_t offset){ + int return_value; + +#ifdef NDB_WIN32 + DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN); + if(dwSFP != offset) { + return GetLastError(); + } +#elif defined NDB_OSE || defined NDB_SOFTOSE + return_value = lseek(theFd, offset, SEEK_SET); + if (return_value != offset) { + return errno; + } +#endif + + while (size > 0) { + size_t bytes_read = 0; + +#ifdef NDB_WIN32 + DWORD dwBytesRead; + BOOL bRead = ReadFile(hFile, + buf, + size, + &dwBytesRead, + 0); + if(!bRead){ + return GetLastError(); + } + bytes_read = dwBytesRead; +#elif defined NDB_OSE || defined NDB_SOFTOSE + return_value = ::read(theFd, buf, size); +#else // UNIX + return_value = ::pread(theFd, buf, size, offset); +#endif +#ifndef NDB_WIN32 + if (return_value == -1 && errno == EINTR) { + DEBUG(ndbout_c("EINTR in read")); + continue; + } else if (return_value == -1){ + return errno; + } else { + bytes_read = return_value; + } +#endif + + if(bytes_read == 0){ + DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d", + size, offset, buf, bytes_read, return_value)); + return ERR_ReadUnderflow; + } + + if(bytes_read != size){ + DEBUG(ndbout_c("Warning partial read %d != %d", + bytes_read, size)); + } + + buf += bytes_read; + size -= bytes_read; + offset += bytes_read; + } + return 0; +} + +void +AsyncFile::readReq( Request * request) +{ + for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) { + off_t offset = request->par.readWrite.pages[i].offset; + size_t size = request->par.readWrite.pages[i].size; + char * buf = request->par.readWrite.pages[i].buf; + + int err = readBuffer(buf, size, offset); + if(err != 0){ + request->error = err; + return; + } + } +} + +void +AsyncFile::readvReq( Request * request) +{ +#if defined NDB_OSE || defined NDB_SOFTOSE + readReq(request); + return; +#elif defined NDB_WIN32 + // ReadFileScatter? + readReq(request); + return; +#else + int return_value; + int length = 0; + struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep + for(int i=0; i < request->par.readWrite.numberOfPages ; i++) { + iov[i].iov_base= request->par.readWrite.pages[i].buf; + iov[i].iov_len= request->par.readWrite.pages[i].size; + length = length + iov[i].iov_len; + } + lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET ); + return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages); + if (return_value == -1) { + request->error = errno; + return; + } else if (return_value != length) { + request->error = 1011; + return; + } +#endif +} + +int +AsyncFile::extendfile(Request* request) { +#if defined NDB_OSE || defined NDB_SOFTOSE + // Find max size of this file in this request + int maxOffset = 0; + int maxSize = 0; + for(int i=0; i < request->par.readWrite.numberOfPages ; i++) { + if (request->par.readWrite.pages[i].offset > maxOffset) { + maxOffset = request->par.readWrite.pages[i].offset; + maxSize = request->par.readWrite.pages[i].size; + } + } + DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize)); + + // Allocate a buffer and fill it with zeros + void* pbuf = malloc(maxSize); + memset(pbuf, 0, maxSize); + for (int p = 0; p <= maxOffset; p = p + maxSize) { + int return_value; + return_value = lseek(theFd, + p, + SEEK_SET); + if((return_value == -1 ) || (return_value != p)) { + return -1; + } + return_value = ::write(theFd, + pbuf, + maxSize); + if ((return_value == -1) || (return_value != maxSize)) { + return -1; + } + } + free(pbuf); + + DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str())); + return 0; +#else + request = request; + abort(); + return -1; +#endif +} + +void +AsyncFile::writeReq( Request * request) +{ + int page_num = 0; + bool write_not_complete = true; + + while(write_not_complete) { + int totsize = 0; + off_t offset = request->par.readWrite.pages[page_num].offset; + char* bufptr = theWriteBuffer; + + write_not_complete = false; + if (request->par.readWrite.numberOfPages > 1) { + off_t page_offset = offset; + + // Multiple page write, copy to buffer for one write + for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) { + memcpy(bufptr, + request->par.readWrite.pages[i].buf, + request->par.readWrite.pages[i].size); + bufptr += request->par.readWrite.pages[i].size; + totsize += request->par.readWrite.pages[i].size; + if (((i + 1) < request->par.readWrite.numberOfPages)) { + // There are more pages to write + // Check that offsets are consequtive + if ((page_offset + request->par.readWrite.pages[i].size) + != + request->par.readWrite.pages[i+1].offset) { + // Next page is not aligned with previous, not allowed + DEBUG(ndbout_c("Page offsets are not aligned")); + request->error = EINVAL; + return; + } + if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) { + // We are not finished and the buffer is full + write_not_complete = true; + // Start again with next page + page_num = i + 1; + break; + } + } + page_offset += request->par.readWrite.pages[i].size; + } + bufptr = theWriteBuffer; + } else { + // One page write, write page directly + bufptr = request->par.readWrite.pages[0].buf; + totsize = request->par.readWrite.pages[0].size; + } + int err = writeBuffer(bufptr, totsize, offset); + if(err != 0){ + request->error = err; + return; + } + } // while(write_not_complete) +} + +int +AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset, + size_t chunk_size) +{ + size_t bytes_to_write = chunk_size; + int return_value; + +#ifdef NDB_WIN32 + DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN); + if(dwSFP != offset) { + return GetLastError(); + } +#elif defined NDB_OSE || defined NDB_SOFTOSE + return_value = lseek(theFd, offset, SEEK_SET); + if (return_value != offset) { + DEBUG(ndbout_c("AsyncFile::writeReq, err1: return_value=%d, offset=%d\n", + return_value, chunk_offset)); + PRINT_ERRORANDFLAGS(0); + if (errno == 78) { + // Could not write beyond end of file, try to extend file + DEBUG(ndbout_c("AsyncFile::writeReq, Extend. file! filename=\"%s\" \n", + theFileName.c_str())); + return_value = extendfile(request); + if (return_value == -1) { + return errno; + } + return_value = lseek(theFd, offset, SEEK_SET); + if (return_value != offset) { + return errno; + } + } else { + return errno; + } + } +#endif + + while (size > 0) { + if (size < bytes_to_write){ + // We are at the last chunk + bytes_to_write = size; + } + size_t bytes_written = 0; + +#ifdef NDB_WIN32 + DWORD dwWritten; + BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0); + if(!bWrite) { + return GetLastError(); + } + bytes_written = dwWritten; + if (bytes_written != bytes_to_write) { + DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write)); + } + +#elif defined NDB_OSE || defined NDB_SOFTOSE + return_value = ::write(theFd, buf, bytes_to_write); +#else // UNIX + return_value = ::pwrite(theFd, buf, bytes_to_write, offset); +#endif +#ifndef NDB_WIN32 + if (return_value == -1 && errno == EINTR) { + bytes_written = 0; + DEBUG(ndbout_c("EINTR in write")); + } else if (return_value == -1){ + return errno; + } else { + bytes_written = return_value; + + if(bytes_written == 0){ + abort(); + } + + if(bytes_written != bytes_to_write){ + DEBUG(ndbout_c("Warning partial write %d != %d", + bytes_written, bytes_to_write)); + } + } +#endif + + buf += bytes_written; + size -= bytes_written; + offset += bytes_written; + } + return 0; +} + +void +AsyncFile::writevReq( Request * request) +{ + // WriteFileGather on WIN32? + writeReq(request); +} + + +void +AsyncFile::closeReq(Request * request) +{ + syncReq(request); +#ifdef NDB_WIN32 + if(!CloseHandle(hFile)) { + request->error = GetLastError(); + } + hFile = INVALID_HANDLE_VALUE; +#else + if (-1 == ::close(theFd)) { + request->error = errno; + } + theFd = -1; +#endif +} + +bool AsyncFile::isOpen(){ +#ifdef NDB_WIN32 + return (hFile != INVALID_HANDLE_VALUE); +#else + return (theFd != -1); +#endif +} + + +void +AsyncFile::syncReq(Request * request) +{ + if(m_openedWithSync){ + return; + } +#ifdef NDB_WIN32 + if(!FlushFileBuffers(hFile)) { + request->error = GetLastError(); + return; + } +#else + if (-1 == ::fsync(theFd)){ + request->error = errno; + return; + } +#endif + m_syncCount = 0; +} + +void +AsyncFile::appendReq(Request * request){ + + const char * buf = request->par.append.buf; + Uint32 size = request->par.append.size; + + m_syncCount += size; + +#ifdef NDB_WIN32 + DWORD dwWritten = 0; + while(size > 0){ + if(!WriteFile(hFile, buf, size, &dwWritten, 0)){ + request->error = GetLastError(); + return ; + } + + buf += dwWritten; + size -= dwWritten; + } +#else + while(size > 0){ + const int n = write(theFd, buf, size); + if(n == -1 && errno == EINTR){ + continue; + } + if(n == -1){ + request->error = errno; + return; + } + if(n == 0){ + abort(); + } + size -= n; + buf += n; + } +#endif + + if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){ + syncReq(request); + request->error = 0; + } +} + +void +AsyncFile::removeReq(Request * request) +{ +#ifdef NDB_WIN32 + if(!DeleteFile(theFileName.c_str())) { + request->error = GetLastError(); + } +#else + if (-1 == ::remove(theFileName.c_str())) { + request->error = errno; + + } +#endif +} + +void +AsyncFile::rmrfReq(Request * request, char * path, bool removePath){ + Uint32 path_len = strlen(path); + Uint32 path_max_copy = PATH_MAX - path_len; + char* path_add = &path[path_len]; +#ifndef NDB_WIN32 + if(!request->par.rmrf.directory){ + // Remove file + if(unlink((const char *)path) != 0 && errno != ENOENT) + request->error = errno; + return; + } + // Remove directory + DIR* dirp = opendir((const char *)path); + if(dirp == 0){ + if(errno != ENOENT) + request->error = errno; + return; + } + struct dirent * dp; + while ((dp = readdir(dirp)) != NULL){ + if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) { + snprintf(path_add, (size_t)path_max_copy, "%s%s", + DIR_SEPARATOR, dp->d_name); + if(remove((const char*)path) == 0){ + path[path_len] = 0; + continue; + } + + rmrfReq(request, path, true); + path[path_len] = 0; + if(request->error != 0){ + closedir(dirp); + return; + } + } + } + closedir(dirp); + if(removePath && rmdir((const char *)path) != 0){ + request->error = errno; + } + return; +#else + + if(!request->par.rmrf.directory){ + // Remove file + if(!DeleteFile(path)){ + DWORD dwError = GetLastError(); + if(dwError!=ERROR_FILE_NOT_FOUND) + request->error = dwError; + } + return; + } + + strcat(path, "\\*"); + WIN32_FIND_DATA ffd; + HANDLE hFindFile = FindFirstFile(path, &ffd); + path[path_len] = 0; + if(INVALID_HANDLE_VALUE==hFindFile){ + DWORD dwError = GetLastError(); + if(dwError!=ERROR_PATH_NOT_FOUND) + request->error = dwError; + return; + } + + do { + if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){ + strcat(path, "\\"); + strcat(path, ffd.cFileName); + if(DeleteFile(path)) { + path[path_len] = 0; + continue; + }//if + + rmrfReq(request, path, true); + path[path_len] = 0; + if(request->error != 0){ + FindClose(hFindFile); + return; + } + } + } while(FindNextFile(hFindFile, &ffd)); + + FindClose(hFindFile); + + if(removePath && !RemoveDirectory(path)) + request->error = GetLastError(); + +#endif +} + +void AsyncFile::endReq() +{ + // Thread is ended with return + if (theWriteBuffer) NdbMem_Free(theWriteBuffer); + NdbThread_Exit(0); +} + + +void AsyncFile::createDirectories() +{ + for (int i = 0; i < theFileName.levels(); i++) { +#ifdef NDB_WIN32 + CreateDirectory(theFileName.directory(i), 0); +#else + //printf("AsyncFile::createDirectories : \"%s\"\n", theFileName.directory(i)); + mkdir(theFileName.directory(i), S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP); +#endif + } +} + +#ifdef DEBUG_ASYNCFILE +void printErrorAndFlags(Uint32 used_flags) { + char buf[255]; + sprintf(buf, "PEAF: errno=%d \"", errno); + + switch(errno) { + case EACCES: + strcat(buf, "EACCES"); + break; + case EDQUOT: + strcat(buf, "EDQUOT"); + break; + case EEXIST : + strcat(buf, "EEXIST"); + break; + case EINTR : + strcat(buf, "EINTR"); + break; + case EFAULT : + strcat(buf, "EFAULT"); + break; + case EIO : + strcat(buf, "EIO"); + break; + case EISDIR : + strcat(buf, "EISDIR"); + break; + case ELOOP : + strcat(buf, "ELOOP"); + break; + case EMFILE : + strcat(buf, "EMFILE"); + break; + case ENFILE : + strcat(buf, "ENFILE"); + break; + case ENOENT : + strcat(buf, "ENOENT "); + break; + case ENOSPC : + strcat(buf, "ENOSPC"); + break; + case ENOTDIR : + strcat(buf, "ENOTDIR"); + break; + case ENXIO : + strcat(buf, "ENXIO"); + break; + case EOPNOTSUPP: + strcat(buf, "EOPNOTSUPP"); + break; +#if !defined NDB_OSE && !defined NDB_SOFTOSE + case EMULTIHOP : + strcat(buf, "EMULTIHOP"); + break; + case ENOLINK : + strcat(buf, "ENOLINK"); + break; + case ENOSR : + strcat(buf, "ENOSR"); + break; + case EOVERFLOW : + strcat(buf, "EOVERFLOW"); + break; +#endif + case EROFS : + strcat(buf, "EROFS"); + break; + case EAGAIN : + strcat(buf, "EAGAIN"); + break; + case EINVAL : + strcat(buf, "EINVAL"); + break; + case ENOMEM : + strcat(buf, "ENOMEM"); + break; + case ETXTBSY : + strcat(buf, "ETXTBSY"); + break; + case ENAMETOOLONG: + strcat(buf, "ENAMETOOLONG"); + break; + case EBADF: + strcat(buf, "EBADF"); + break; + case ESPIPE: + strcat(buf, "ESPIPE"); + break; + case ESTALE: + strcat(buf, "ESTALE"); + break; + default: + strcat(buf, "EOTHER"); + break; + } + strcat(buf, "\" "); +#if defined NDB_OSE + strcat(buf, strerror(errno) << " "); +#endif + strcat(buf, " flags: "); + switch(used_flags & 3){ + case O_RDONLY: + strcat(buf, "O_RDONLY, "); + break; + case O_WRONLY: + strcat(buf, "O_WRONLY, "); + break; + case O_RDWR: + strcat(buf, "O_RDWR, "); + break; + default: + strcat(buf, "Unknown!!, "); + } + + if((used_flags & O_APPEND)==O_APPEND) + strcat(buf, "O_APPEND, "); + if((used_flags & O_CREAT)==O_CREAT) + strcat(buf, "O_CREAT, "); + if((used_flags & O_EXCL)==O_EXCL) + strcat(buf, "O_EXCL, "); + if((used_flags & O_NOCTTY) == O_NOCTTY) + strcat(buf, "O_NOCTTY, "); + if((used_flags & O_NONBLOCK)==O_NONBLOCK) + strcat(buf, "O_NONBLOCK, "); + if((used_flags & O_TRUNC)==O_TRUNC) + strcat(buf, "O_TRUNC, "); +#if !defined NDB_OSE && !defined NDB_SOFTOSE + if((used_flags & O_DSYNC)==O_DSYNC) + strcat(buf, "O_DSYNC, "); + if((used_flags & O_NDELAY)==O_NDELAY) + strcat(buf, "O_NDELAY, "); + if((used_flags & O_RSYNC)==O_RSYNC) + strcat(buf, "O_RSYNC, "); + if((used_flags & O_SYNC)==O_SYNC) + strcat(buf, "O_SYNC, "); + DEBUG(ndbout_c(buf)); +#endif + +} +#endif diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp b/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp new file mode 100644 index 00000000000..caa03e52d0c --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp @@ -0,0 +1,234 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef AsyncFile_H +#define AsyncFile_H + +//=========================================================================== +// +// .DESCRIPTION +// Asynchronous file, All actions are executed concurrently with other +// activity of the process. +// Because all action are performed in a seperated thread the result of +// of a action is send back tru a memory channel. +// For the asyncronise notivication of a finished request all the calls +// have a request as paramater, the user can use the userData pointer +// to add information it needs when the request is send back. +// +// +// .TYPICAL USE: +// Writing or reading data to/from disk concurrently to other activities. +// +//=========================================================================== +//============================================================================= +// +// .PUBLIC +// +//============================================================================= +/////////////////////////////////////////////////////////////////////////////// +// +// AsyncFile( ); +// Description: +// Initialisation of the class. +// Parameters: +// - +/////////////////////////////////////////////////////////////////////////////// +// +// ~AsyncFile( ); +// Description: +// Tell the thread to stop and wait for it to return +// Parameters: +// - +/////////////////////////////////////////////////////////////////////////////// +// +// doStart( ); +// Description: +// Spawns the new thread. +// Parameters: +// Base path of filesystem +// +/////////////////////////////////////////////////////////////////////////////// +// +// void execute(Request *request); +// Description: +// performens the requered action. +// Parameters: +// request: request to be called when open is finished. +// action= open|close|read|write|sync +// if action is open then: +// par.open.flags= UNIX open flags, see man open +// par.open.name= name of the file to open +// if action is read or write then: +// par.readWrite.buf= user provided buffer to read/write +// the data from/to +// par.readWrite.size= how many bytes must be read/written +// par.readWrite.offset= absolute offset in file in bytes +// return: +// return values are stored in the request error field: +// error= return state of the action, UNIX error see man open/errno +// userData= is untouched can be used be user. +// +/////////////////////////////////////////////////////////////////////////////// +// +// void reportTo( MemoryChannel<Request> *reportTo ); +// Description: +// set the channel where the file must report the result of the +// actions back to. +// Parameters: +// reportTo: the memory channel to use use MemoryChannelMultipleWriter +// if more +// than one file uses this channel to report back. +// +/////////////////////////////////////////////////////////////////////////////// + +#include <kernel_types.h> +#include "MemoryChannel.hpp" +#include "Filename.hpp" + +const int ERR_ReadUnderflow = 1000; + +const int WRITECHUNK = 262144; + +class AsyncFile; + +class Request +{ +public: + enum Action { + open, + close, + closeRemove, + read, // Allways leave readv directly after + // read because SimblockAsyncFileSystem depends on it + readv, + write,// Allways leave writev directly after + // write because SimblockAsyncFileSystem depends on it + writev, + writeSync,// Allways leave writevSync directly after + // writeSync because SimblockAsyncFileSystem depends on it + writevSync, + sync, + end, + append, + rmrf + }; + Action action; + union { + struct { + Uint32 flags; + } open; + struct { + int numberOfPages; + struct{ + char *buf; + size_t size; + off_t offset; + } pages[16]; + } readWrite; + struct { + const char * buf; + size_t size; + } append; + struct { + bool directory; + bool own_directory; + } rmrf; + } par; + int error; + + void set(BlockReference userReference, + Uint32 userPointer, + Uint16 filePointer); + BlockReference theUserReference; + Uint32 theUserPointer; + Uint16 theFilePointer; + // Information for open, needed if the first open action fails. + AsyncFile* file; + Uint32 theTrace; +}; + + +inline +void +Request::set(BlockReference userReference, + Uint32 userPointer, Uint16 filePointer) +{ + theUserReference= userReference; + theUserPointer= userPointer; + theFilePointer= filePointer; +} + +class AsyncFile +{ +public: + AsyncFile(); + ~AsyncFile(); + + void reportTo( MemoryChannel<Request> *reportTo ); + + void execute( Request* request ); + + void doStart(const char * fspath); + // its a thread so its always running + void run(); + + bool isOpen(); + + Filename theFileName; +private: + + void openReq(Request *request); + void readReq(Request *request); + void readvReq(Request *request); + void writeReq(Request *request); + void writevReq(Request *request); + + void closeReq(Request *request); + void syncReq(Request *request); + void removeReq(Request *request); + void appendReq(Request *request); + void rmrfReq(Request *request, char * path, bool removePath); + void endReq(); + + int readBuffer(char * buf, size_t size, off_t offset); + int writeBuffer(const char * buf, size_t size, off_t offset, + size_t chunk_size = WRITECHUNK); + + int extendfile(Request* request); + void createDirectories(); + +#ifdef NDB_WIN32 + HANDLE hFile; +#else + int theFd; +#endif + + MemoryChannel<Request> *theReportTo; + MemoryChannel<Request>* theMemoryChannelPtr; + + struct NdbThread* theThreadPtr; + NdbMutex* theStartMutexPtr; + NdbCondition* theStartConditionPtr; + bool theStartFlag; + int theWriteBufferSize; + char* theWriteBuffer; + + bool m_openedWithSync; + Uint32 m_syncCount; + Uint32 m_syncFrequency; +}; + +#endif diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp new file mode 100644 index 00000000000..b9954ba130f --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp @@ -0,0 +1,697 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +//#define TESTDEBUG 1 + +#include <sys/stat.h> +#include <fcntl.h> + +#include <string.h> +#include <kernel_types.h> +#include <Pool.hpp> +#include "AsyncFile.hpp" +#include "NdbOut.hpp" +#include "NdbTick.h" +#include "NdbThread.h" +#include "NdbMain.h" + +// Test and benchmark functionality of AsyncFile +// -n Number of files +// -r Number of simultaneous requests +// -s Filesize, number of pages +// -l Number of iterations +// -remove, remove files after close +// -reverse, write files in reverse order, start with the last page + +#define MAXFILES 255 +#define DEFAULT_NUM_FILES 1 +#define MAXREQUESTS 256 +#define DEFAULT_NUM_REQUESTS 1 +#define MAXFILESIZE 4096 +#define DEFAULT_FILESIZE 2048 +#define FVERSION 0x01000000 +#define PAGESIZE 8192 + +#define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond() +#define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();\ + Uint64 totaltime = (stoptick-starttick); \ + ndbout << ops << " " << str << \ + " total time " << (int)totaltime << "ms" << endl;\ + char buf[255];\ + sprintf(buf, "%d %s/sec\n",(int)((ops*1000)/totaltime), str);\ + ndbout <<buf << endl;} + +static int numberOfFiles = DEFAULT_NUM_FILES; +static int numberOfRequests = DEFAULT_NUM_REQUESTS; +static int fileSize = DEFAULT_FILESIZE; +static int removeFiles = 0; +static int writeFilesReverse = 0; +static int numberOfIterations = 1; +Uint32 FileNameArray[4]; + +Pool<AsyncFile>* files; +AsyncFile* openFiles[MAXFILES]; +Pool<Request>* theRequestPool; +MemoryChannelMultipleWriter<Request>* theReportChannel; + +char WritePages[MAXFILES][PAGESIZE]; +char ReadPages[MAXFILES][PAGESIZE]; + +int readArguments(int argc, const char** argv); +int openFile(int fileNum); +int openFileWait(); +int closeFile(int fileNum); +int closeFileWait(); +int writeFile( int fileNum, int pagenum); +int writeFileWait(); +int writeSyncFile( int fileNum, int pagenum); +int writeSyncFileWait(); +int readFile( int fileNum, int pagenum); +int readFileWait(); + + +NDB_COMMAND(aftest, "aftest", "aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]", "Test the AsyncFile class of Ndb", 8192) +{ + int s, numReq, numOps; + + readArguments(argc, argv); + + files = new Pool<AsyncFile>(numberOfFiles, 2); + theRequestPool = new Pool<Request>; + theReportChannel = new MemoryChannelMultipleWriter<Request>; + + ndbout << "AsyncFileTest starting" << endl; + ndbout << " " << numberOfFiles << " files" << endl; + ndbout << " " << numberOfRequests << " requests" << endl; + ndbout << " " << fileSize << " * 8k files" << endl << endl; + ndbout << " " << numberOfIterations << " iterations" << endl << endl; + + NdbThread_SetConcurrencyLevel(numberOfFiles+2); + + // initialize data to write to files + for (int i = 0; i < MAXFILES; i++) { + for (int j = 0; j < PAGESIZE; j++){ + WritePages[i][j] = (64+i+j)%256; + } + // memset(&WritePages[i][0], i+64, PAGESIZE); + } + + // Set file directory and name + // /T27/F27/NDBFS/S27Pnn.data + FileNameArray[0] = 27; // T27 + FileNameArray[1] = 27; // F27 + FileNameArray[2] = 27; // S27 + FileNameArray[3] = FVERSION; // Version + + for (int l = 0; l < numberOfIterations; l++) + { + + ndbout << "Opening files" << endl; + // Open files + for (int f = 0; f < numberOfFiles; f++) + { + openFile(f); + + } + + // Wait for answer + openFileWait(); + + ndbout << "Files opened!" << endl<< endl; + + // Write to files + ndbout << "Started writing" << endl; + TIMER_START; + s = 0; + numReq = 0; + numOps = 0; + while ( s < fileSize) + { + for (int r = 0; r < numberOfRequests; r++) + { + for (int f = 0; f < numberOfFiles; f++) + { + writeFile(f, s); + numReq++; + numOps++; + } + + s++; + } + + while (numReq > 0) + { + writeFileWait(); + numReq--; + } + + } + + TIMER_PRINT("writes", numOps); + + + ndbout << "Started reading" << endl; + TIMER_START; + + // Read from files + s = 0; + numReq = 0; + numOps = 0; + while ( s < fileSize) + { + for (int r = 0; r < numberOfRequests; r++) + { + for (int f = 0; f < numberOfFiles; f++) + { + readFile(f, s); + numReq++; + numOps++; + } + + s++; + + } + + while (numReq > 0) + { + readFileWait(); + numReq--; + } + + } + TIMER_PRINT("reads", numOps); + + ndbout << "Started writing with sync" << endl; + TIMER_START; + + // Write to files + s = 0; + numReq = 0; + numOps = 0; + while ( s < fileSize) + { + for (int r = 0; r < numberOfRequests; r++) + { + for (int f = 0; f < numberOfFiles; f++) + { + writeSyncFile(f, s); + numReq++; + numOps++; + } + + s++; + } + + while (numReq > 0) + { + writeSyncFileWait(); + numReq--; + } + + } + + TIMER_PRINT("writeSync", numOps); + + // Close files + ndbout << "Closing files" << endl; + for (int f = 0; f < numberOfFiles; f++) + { + closeFile(f); + + } + + // Wait for answer + closeFileWait(); + + ndbout << "Files closed!" << endl<< endl; + } + + // Deallocate memory + delete files; + delete theReportChannel; + delete theRequestPool; + + return 0; + +} + + + +int forward( AsyncFile * file, Request* request ) +{ + file->execute(request); + ERROR_CHECK 0; + return 1; +} + +int openFile( int fileNum) +{ + AsyncFile* file = (AsyncFile *)files->get(); + + FileNameArray[3] = fileNum | FVERSION; + file->fileName().set( NDBFS_REF, &FileNameArray[0] ); + ndbout << "openFile: " << file->fileName().c_str() << endl; + + if( ERROR_STATE ) { + ERROR_RESET; + files->put( file ); + ndbout << "Failed to set filename" << endl; + return 1; + } + file->reportTo(theReportChannel); + + Request* request = theRequestPool->get(); + request->action= Request::open; + request->error= 0; + request->par.open.flags = 0x302; //O_RDWR | O_CREAT | O_TRUNC ; // 770 + request->set(NDBFS_REF, 0x23456789, fileNum ); + request->file = file; + + if (!forward(file,request)) { + // Something went wrong + ndbout << "Could not forward open request" << endl; + theRequestPool->put(request); + return 1; + } + return 0; +} + +int closeFile( int fileNum) +{ + + AsyncFile* file = openFiles[fileNum]; + + Request* request = theRequestPool->get(); + if (removeFiles == 1) + request->action = Request::closeRemove; + else + request->action= Request::close; + + request->error= 0; + request->set(NDBFS_REF, 0x23456789, fileNum ); + request->file = file; + + if (!forward(file,request)) { + // Something went wrong + ndbout << "Could not forward close request" << endl; + theRequestPool->put(request); + return 1; + } + return 0; +} + +int writeFile( int fileNum, int pagenum) +{ + AsyncFile* file = openFiles[fileNum]; +#ifdef TESTDEBUG + ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str()<< endl; +#endif + Request *request = theRequestPool->get(); + request->action = Request::write; + request->error = 0; + request->set(NDBFS_REF, pagenum, fileNum); + request->file = openFiles[fileNum]; + + // Write only one page, choose the correct page for each file using fileNum + request->par.readWrite.pages[0].buf = &WritePages[fileNum][0]; + request->par.readWrite.pages[0].size = PAGESIZE; + if (writeFilesReverse == 1) + { + // write the last page in the files first + // This is a normal way for the Blocks in Ndb to write to a file + request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE; + } + else + { + request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; + } + request->par.readWrite.numberOfPages = 1; + + if (!forward(file,request)) { + // Something went wrong + ndbout << "Could not forward write request" << endl; + theRequestPool->put(request); + return 1; + } + return 0; + +} + +int writeSyncFile( int fileNum, int pagenum) +{ + AsyncFile* file = openFiles[fileNum]; +#ifdef TESTDEBUG + ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl; +#endif + Request *request = theRequestPool->get(); + request->action = Request::writeSync; + request->error = 0; + request->set(NDBFS_REF, pagenum, fileNum); + request->file = openFiles[fileNum]; + + // Write only one page, choose the correct page for each file using fileNum + request->par.readWrite.pages[0].buf = &WritePages[fileNum][0]; + request->par.readWrite.pages[0].size = PAGESIZE; + request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; + request->par.readWrite.numberOfPages = 1; + + if (!forward(file,request)) { + // Something went wrong + ndbout << "Could not forward write request" << endl; + theRequestPool->put(request); + return 1; + } + return 0; + +} + +int readFile( int fileNum, int pagenum) +{ + AsyncFile* file = openFiles[fileNum]; +#ifdef TESTDEBUG + ndbout << "readFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl; +#endif + Request *request = theRequestPool->get(); + request->action = Request::read; + request->error = 0; + request->set(NDBFS_REF, pagenum, fileNum); + request->file = openFiles[fileNum]; + + // Read only one page, choose the correct page for each file using fileNum + request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0]; + request->par.readWrite.pages[0].size = PAGESIZE; + request->par.readWrite.pages[0].offset = pagenum * PAGESIZE; + request->par.readWrite.numberOfPages = 1; + + if (!forward(file,request)) { + // Something went wrong + ndbout << "Could not forward read request" << endl; + theRequestPool->put(request); + return 1; + } + return 0; + +} + +int openFileWait() +{ + int openedFiles = 0; + while (openedFiles < numberOfFiles) + { + Request* request = theReportChannel->readChannel(); + if (request) + { + if (request->action == Request::open) + { + if (request->error ==0) + { +#ifdef TESTDEBUG + ndbout << "Opened file " << request->file->fileName().c_str() << endl; +#endif + openFiles[request->theFilePointer] = request->file; + } + else + { + ndbout << "error while opening file" << endl; + exit(1); + } + theRequestPool->put(request); + openedFiles++; + } + else + { + ndbout << "Unexpected request received" << endl; + } + } + else + { + ndbout << "Nothing read from theReportChannel" << endl; + } + } + return 0; +} + +int closeFileWait() +{ + int closedFiles = 0; + while (closedFiles < numberOfFiles) + { + Request* request = theReportChannel->readChannel(); + if (request) + { + if (request->action == Request::close || request->action == Request::closeRemove) + { + if (request->error ==0) + { +#ifdef TESTDEBUG + ndbout << "Closed file " << request->file->fileName().c_str() << endl; +#endif + openFiles[request->theFilePointer] = NULL; + files->put(request->file); + } + else + { + ndbout << "error while closing file" << endl; + exit(1); + } + theRequestPool->put(request); + closedFiles++; + } + else + { + ndbout << "Unexpected request received" << endl; + } + } + else + { + ndbout << "Nothing read from theReportChannel" << endl; + } + } + return 0; +} + +int writeFileWait() +{ + Request* request = theReportChannel->readChannel(); + if (request) + { + if (request->action == Request::write) + { + if (request->error == 0) + { +#ifdef TESTDEBUG + ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; +#endif + + } + else + { + ndbout << "error while writing file, error=" << request->error << endl; + exit(1); + } + theRequestPool->put(request); + } + else + { + ndbout << "Unexpected request received" << endl; + } + } + else + { + ndbout << "Nothing read from theReportChannel" << endl; + } + return 0; +} + +int writeSyncFileWait() +{ + Request* request = theReportChannel->readChannel(); + if (request) + { + if (request->action == Request::writeSync) + { + if (request->error == 0) + { +#ifdef TESTDEBUG + ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; +#endif + + } + else + { + ndbout << "error while writing file" << endl; + exit(1); + } + theRequestPool->put(request); + } + else + { + ndbout << "Unexpected request received" << endl; + } + } + else + { + ndbout << "Nothing read from theReportChannel" << endl; + } + return 0; +} + +int readFileWait() +{ + Request* request = theReportChannel->readChannel(); + if (request) + { + if (request->action == Request::read) + { + if (request->error == 0) + { +#ifdef TESTDEBUG + ndbout << "readFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl; +#endif + if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0) + { + ndbout <<"Verification error!" << endl; + for (int i = 0; i < PAGESIZE; i++ ){ + ndbout <<" Compare Page " << i << " : " << ReadPages[request->theFilePointer][i] <<", " <<WritePages[request->theFilePointer][i] << endl;; + if( ReadPages[request->theFilePointer][i] !=WritePages[request->theFilePointer][i]) + + exit(1); + } + } + + } + else + { + ndbout << "error while reading file" << endl; + exit(1); + } + theRequestPool->put(request); + } + else + { + ndbout << "Unexpected request received" << endl; + } + } + else + { + ndbout << "Nothing read from theReportChannel" << endl; + } + return 0; +} + +int readArguments(int argc, const char** argv) +{ + + int i = 1; + while (argc > 1) + { + if (strcmp(argv[i], "-n") == 0) + { + numberOfFiles = atoi(argv[i+1]); + if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES)) + { + ndbout << "Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl; + numberOfFiles = DEFAULT_NUM_FILES; + } + } + else if (strcmp(argv[i], "-r") == 0) + { + numberOfRequests = atoi(argv[i+1]); + if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS)) + { + ndbout << "Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl; + numberOfRequests = DEFAULT_NUM_REQUESTS; + } + } + else if (strcmp(argv[i], "-s") == 0) + { + fileSize = atoi(argv[i+1]); + if ((fileSize < 1) || (fileSize > MAXFILESIZE)) + { + ndbout << "Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl; + fileSize = DEFAULT_FILESIZE; + } + } + else if (strcmp(argv[i], "-l") == 0) + { + numberOfIterations = atoi(argv[i+1]); + if ((numberOfIterations < 1)) + { + ndbout << "Wrong number of iterations, default = 1" << endl; + numberOfIterations = 1; + } + } + else if (strcmp(argv[i], "-remove") == 0) + { + removeFiles = 1; + argc++; + i--; + } + else if (strcmp(argv[i], "-reverse") == 0) + { + ndbout << "Writing files reversed" << endl; + writeFilesReverse = 1; + argc++; + i--; + } + + argc -= 2; + i = i + 2; + } + + if ((fileSize % numberOfRequests)!= 0) + { + numberOfRequests = numberOfRequests - (fileSize % numberOfRequests); + ndbout <<"numberOfRequest must be modulo of filesize" << endl; + ndbout << "New numberOfRequest="<<numberOfRequests<<endl; + } + return 0; +} + + +// Needed for linking... + +void ErrorReporter::handleError(ErrorCategory type, int messageID, + const char* problemData, const char* objRef, NdbShutdownType stype) +{ + + ndbout << "ErrorReporter::handleError activated" << endl; + ndbout << "type= " << type << endl; + ndbout << "messageID= " << messageID << endl; + ndbout << "problemData= " << problemData << endl; + ndbout << "objRef= " << objRef << endl; + + exit(1); +} + +void ErrorReporter::handleAssert(const char* message, const char* file, int line) +{ + ndbout << "ErrorReporter::handleAssert activated" << endl; + ndbout << "message= " << message << endl; + ndbout << "file= " << file << endl; + ndbout << "line= " << line << endl; + exit(1); +} + + +GlobalData globalData; + + +Signal::Signal() +{ + +} + diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile new file mode 100644 index 00000000000..b0356e6da68 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile @@ -0,0 +1,27 @@ +include .defs.mk + +TYPE := kernel + +BIN_TARGET := aftest +BIN_TARGET_ARCHIVES := ndbfs portlib trace signaldataprint + +SOURCES = AsyncFileTest.cpp + +CFLAGS_AsyncFileTest.cpp = -I../ + +include $(NDB_TOP)/Epilogue.mk + + +# run basic tests +run_test : + $(NDB_TOP)/bin/$(BIN_TARGET) + $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -remove + $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -reverse -remove + $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -s 512 -remove + $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 4 -l 1000 + + + + + + diff --git a/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp b/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp new file mode 100644 index 00000000000..30b40097c9b --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp @@ -0,0 +1,20 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "CircularIndex.hpp" + + + diff --git a/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp b/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp new file mode 100644 index 00000000000..349cccdbcb4 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp @@ -0,0 +1,116 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CircularIndex_H +#define CircularIndex_H + +//=========================================================================== +// +// .DESCRIPTION +// Building block for circular buffers. It increment as a normal index. +// untill it it becomes the maximum size then it becomes zero. +// +// .TYPICAL USE: +// to implement a circular buffer. +// +// .EXAMPLE: +// See MemoryChannel.C +//=========================================================================== + +/////////////////////////////////////////////////////////////////////////////// +// CircularIndex( int start= 0,int size=256 ); +// Constuctor +// Parameters: +// start: where to start to index +// size : range of the index, will be from 0 to size-1 +/////////////////////////////////////////////////////////////////////////////// +// operator int (); +// returns the index +/////////////////////////////////////////////////////////////////////////////// +// void operator ++ (); +// increments the index with one, of size is reached it is set to zero +/////////////////////////////////////////////////////////////////////////////// +// friend int full( const CircularIndex& write, const CircularIndex& read ); +// Taken the write index and the read index from a buffer it is calculated +// if the buffer is full +// Parameters: +// write: index used a write index for the buffer +// read : index used a read index for the buffer +// return +// 0 : not full +// 1 : full +/////////////////////////////////////////////////////////////////////////////// +// friend int empty( const CircularIndex& write, const CircularIndex& read ); +// Taken the write index and the read index from a buffer it is calculated +// if the buffer is empty +// Parameters: +// write: index used a write index for the buffer +// read : index used a read index for the buffer +// return +// 0 : not empty +// 1 : empty +/////////////////////////////////////////////////////////////////////////////// + +class CircularIndex +{ +public: + inline CircularIndex( int start= 0,int size=256 ); + operator int (); + CircularIndex& operator ++ (); + friend int full( const CircularIndex& write, const CircularIndex& read ); + friend int empty( const CircularIndex& write, const CircularIndex& read ); +private: + int theSize; + int theIndex; +}; + +inline CircularIndex::operator int () +{ + return theIndex; +} + +inline CircularIndex& CircularIndex::operator ++ () +{ + ++theIndex; + if( theIndex >= theSize ){ + theIndex= 0; + } + return *this; +} + + +inline int full( const CircularIndex& write, const CircularIndex& read ) +{ + int readTmp= read.theIndex; + + if( read.theIndex < write.theIndex ) + readTmp += read.theSize; + + return ( readTmp - write.theIndex) == 1; +} + +inline int empty( const CircularIndex& write, const CircularIndex& read ) +{ + return read.theIndex == write.theIndex; +} + + +inline CircularIndex::CircularIndex( int start,int size ): + theSize(size), + theIndex(start) +{ +} +#endif diff --git a/ndb/src/kernel/blocks/ndbfs/Filename.cpp b/ndb/src/kernel/blocks/ndbfs/Filename.cpp new file mode 100644 index 00000000000..98ff7c7e4e4 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Filename.cpp @@ -0,0 +1,220 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <stdlib.h> +#include <string.h> +#include <NdbStdio.h> +#include <NdbUnistd.h> +#include <NdbOut.hpp> + +#include "Filename.hpp" +#include "ErrorHandlingMacros.hpp" +#include "Error.hpp" +#include "RefConvert.hpp" +#include "DebuggerNames.hpp" + +#include <signaldata/FsOpenReq.hpp> + +static const char* fileExtension[] = { + ".Data", + ".FragLog", + ".LocLog", + ".FragList", + ".TableList", + ".SchemaLog", + ".sysfile", + ".log", + ".ctl" +}; + +static const Uint32 noOfExtensions = sizeof(fileExtension)/sizeof(char*); + +Filename::Filename() : + theLevelDepth(0) +{ +} + +void +Filename::init(const char * pFileSystemPath){ + if (pFileSystemPath == NULL) { + ERROR_SET(fatal, AFS_ERROR_NOPATH, ""," Filename::init()"); + return; + } + + strncpy(theBaseDirectory, pFileSystemPath, PATH_MAX); + + // the environment variable is set, + // check that it is pointing on a valid directory + // + char buf2[PATH_MAX]; memset(buf2, 0,sizeof(buf2)); +#ifdef NDB_WIN32 + char* szFilePart; + if(!GetFullPathName(theBaseDirectory, sizeof(buf2), buf2, &szFilePart) + || (::GetFileAttributes(theBaseDirectory)&FILE_ATTRIBUTE_READONLY)) +#else + if((::realpath(theBaseDirectory, buf2) == NULL)|| + (::access(theBaseDirectory, W_OK) != 0)) +#endif + { + ERROR_SET(fatal, AFS_ERROR_INVALIDPATH, pFileSystemPath, " Filename::init()"); + } + strncpy(theBaseDirectory, buf2, sizeof(theBaseDirectory)); + // path seems ok, add delimiter if missing + if (strcmp(&theBaseDirectory[strlen(theBaseDirectory) - 1], + DIR_SEPARATOR) != 0) + strcat(theBaseDirectory, DIR_SEPARATOR); + +} + + +Filename::~Filename(){ +} + +void +Filename::set(BlockReference blockReference, + const Uint32 filenumber[4], bool dir) +{ + char buf[PATH_MAX]; + theLevelDepth = 0; + strncpy(theName, theBaseDirectory, PATH_MAX); + + const Uint32 type = FsOpenReq::getSuffix(filenumber); + const Uint32 version = FsOpenReq::getVersion(filenumber); + switch(version){ + case 1 :{ + const Uint32 diskNo = FsOpenReq::v1_getDisk(filenumber); + const Uint32 table = FsOpenReq::v1_getTable(filenumber); + const Uint32 frag = FsOpenReq::v1_getFragment(filenumber); + const Uint32 S_val = FsOpenReq::v1_getS(filenumber); + const Uint32 P_val = FsOpenReq::v1_getP(filenumber); + + if (diskNo < 0xff){ + snprintf(buf, sizeof(buf), "D%d%s", diskNo, DIR_SEPARATOR); + strcat(theName, buf); + theLevelDepth++; + } + + { + const char* blockName = getBlockName( refToBlock(blockReference) ); + if (blockName == NULL){ + ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","No Block Name"); + return; + } + snprintf(buf, sizeof(buf), "%s%s", blockName, DIR_SEPARATOR); + strcat(theName, buf); + theLevelDepth++; + } + + if (table < 0xffffffff){ + snprintf(buf, sizeof(buf), "T%d%s", table, DIR_SEPARATOR); + strcat(theName, buf); + theLevelDepth++; + } + + if (frag < 0xffffffff){ + snprintf(buf, sizeof(buf), "F%d%s", frag, DIR_SEPARATOR); + strcat(theName, buf); + theLevelDepth++; + } + + + if (S_val < 0xffffffff){ + snprintf(buf, sizeof(buf), "S%d", S_val); + strcat(theName, buf); + } + + if (P_val < 0xff){ + snprintf(buf, sizeof(buf), "P%d", P_val); + strcat(theName, buf); + } + + } + break; + case 2:{ + const Uint32 seq = FsOpenReq::v2_getSequence(filenumber); + const Uint32 nodeId = FsOpenReq::v2_getNodeId(filenumber); + const Uint32 count = FsOpenReq::v2_getCount(filenumber); + + snprintf(buf, sizeof(buf), "BACKUP%sBACKUP-%d%s", + DIR_SEPARATOR, seq, DIR_SEPARATOR); + strcat(theName, buf); + if(count == 0xffffffff) { + snprintf(buf, sizeof(buf), "BACKUP-%d.%d", + seq, nodeId); strcat(theName, buf); + } else { + snprintf(buf, sizeof(buf), "BACKUP-%d-%d.%d", + seq, count, nodeId); strcat(theName, buf); + } + theLevelDepth = 2; + break; + } + break; + case 3:{ + const Uint32 diskNo = FsOpenReq::v1_getDisk(filenumber); + + if(diskNo == 0xFF){ + ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","Invalid disk specification"); + } + + snprintf(buf, sizeof(buf), "D%d%s", diskNo, DIR_SEPARATOR); + strcat(theName, buf); + theLevelDepth++; + } + break; + default: + ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","Wrong version"); + } + if (type >= noOfExtensions){ + ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","File Type doesn't exist"); + return; + } + strcat(theName, fileExtension[type]); + + if(dir == true){ + for(Uint32 l = strlen(theName) - 1; l >= 0; l--){ + if(theName[l] == DIR_SEPARATOR[0]){ + theName[l] = 0; + break; + } + } + } +} + +/** + * Find out directory name on level + * Ex: + * theName = "/tmp/fs/T0/NDBFS/D0/P0/S27.data" + * level = 1 + * would return "/tmp/fs/T0/NDBFS/ + */ +const char* Filename::directory(int level) +{ + const char* p; + + p = theName; + p += strlen(theBaseDirectory); + + for (int i = 0; i <= level; i++){ + p = strstr(p, DIR_SEPARATOR); + p++; + } + + strncpy(theDirectory, theName, p - theName - 1); + theDirectory[p-theName-1] = 0; + return theDirectory; +} + + diff --git a/ndb/src/kernel/blocks/ndbfs/Filename.hpp b/ndb/src/kernel/blocks/ndbfs/Filename.hpp new file mode 100644 index 00000000000..4c3569b5485 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Filename.hpp @@ -0,0 +1,97 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef Filename_H +#define Filename_H + +//=========================================================================== +// +// .DESCRIPTION +// Takes a 128 bits value (done as a array of four longs) and +// makes a filename out of it acording the following schema +// Bits 0-31 T +// Bits 32-63 F +// Bits 64-95 S +// Bits 96-103 P +// Bits 104-111 D +// Bits 112-119 File Type +// Bits 120-127 Version number of Filename +// +// T, is used to find/create a directory. If T = 0xFFFF then the +// file is on top level. In that case the F is of no relevance. +// F, same as T. +// S, is used to find/create a filename. If S= 0xFFFF then it is ignored. +// P, same as S +// D, is used to find/create the root directory, this is the +// directory before the blockname. If D= 0xFF then it is ignored. +// File Type +// 0 => .Data +// 1 => .FragLog +// 2 => .LocLog +// 3 => .FragList +// 4 => .TableList +// 5 => .SchemaLog +// 6 => .sysfile +// 15=> ignored +// Version number of Filename, current version is 0x1, must be +// used for the this style of options. +// +// +//=========================================================================== + +#include <kernel_types.h> +#include <NdbUnistd.h> + +class Filename +{ +public: + // filenumber is 64 bits but is split in to 4 32bits words + Filename(); + ~Filename(); + void set(BlockReference blockReference, + const Uint32 filenumber[4], bool dir = false); + const char* baseDirectory() const; + const char* directory(int level); + int levels() const; + const char* c_str() const; + + void init(const char * fileSystemPath); + +private: + int theLevelDepth; + char theName[PATH_MAX]; + char theBaseDirectory[PATH_MAX]; + char theDirectory[PATH_MAX]; +}; + +// inline methods +inline const char* Filename::c_str() const{ + return theName; +} + +inline const char* Filename::baseDirectory() const{ + return theBaseDirectory; +} + +inline int Filename::levels() const{ + return theLevelDepth; +} + +#endif + + + + diff --git a/ndb/src/kernel/blocks/ndbfs/Makefile b/ndb/src/kernel/blocks/ndbfs/Makefile new file mode 100644 index 00000000000..58e1458bf16 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Makefile @@ -0,0 +1,14 @@ +include .defs.mk + +TYPE := kernel + +ARCHIVE_TARGET := ndbfs + +SOURCES = \ + AsyncFile.cpp \ + Ndbfs.cpp VoidFs.cpp \ + Filename.cpp \ + CircularIndex.cpp + +include $(NDB_TOP)/Epilogue.mk + diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp new file mode 100644 index 00000000000..a1aebdef7a1 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp @@ -0,0 +1,18 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +//#include "MemoryChannel.hpp" + diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp new file mode 100644 index 00000000000..6e0c2721ca0 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp @@ -0,0 +1,168 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef MemoryChannel_H +#define MemoryChannel_H + +//=========================================================================== +// +// .DESCRIPTION +// Pointer based communication channel for communication between two +// thread. It does not copy any data in or out the channel so the +// item that is put in can not be used untill the other thread has +// given it back. There is no support for detecting the return of a +// item. The channel is half-duplex. +// For comminication between 1 writer and 1 reader use the MemoryChannel +// class, for comminication between multiple writer and 1 reader use the +// MemoryChannelMultipleWriter. There is no support for multiple readers. +// +// .TYPICAL USE: +// to communicate between threads. +// +// .EXAMPLE: +// See AsyncFile.C +//=========================================================================== +// +// +// MemoryChannel( int size= 256); +// Constuctor +// Parameters: +// size : amount of pointer it can hold +// +// void operator ++ (); +// increments the index with one, if size is reached it is set to zero +// +// virtual void write( T *t); +// Puts the item in the channel if the channel is full an error is reported. +// Parameters: +// t: pointer to item to put in the channel, after this the item +// is shared with the other thread. +// errors +// AFS_ERROR_CHANNALFULL, channel is full +// +// T* read(); +// Reads a itemn from the channel, if channel is empty it blocks untill +// an item can be read. +// return +// T : item from the channel +// +// T* tryRead(); +// Reads a item from the channel, if channel is empty it returns zero. +// return +// T : item from the channel or zero if channel is empty. +// + +#if defined NDB_OSE || defined NDB_SOFTOSE +#include "MemoryChannelOSE.hpp" +#else + +#include "ErrorHandlingMacros.hpp" +#include "Error.hpp" +#include "CircularIndex.hpp" +#include "NdbMutex.h" +#include "NdbCondition.h" +#include <NdbOut.hpp> + +#include <assert.h> + +template <class T> +class MemoryChannel +{ +public: + MemoryChannel( int size= 256); + virtual ~MemoryChannel( ); + + virtual void writeChannel( T *t); + T* readChannel(); + T* tryReadChannel(); + +private: + int theSize; + T **theChannel; + CircularIndex theWriteIndex; + CircularIndex theReadIndex; + NdbMutex* theMutexPtr; + NdbCondition* theConditionPtr; + +}; + + +template <class T> MemoryChannel<T>::MemoryChannel( int size): + theSize(size), + theChannel(new T*[size] ), + theWriteIndex(0, size), + theReadIndex(0, size) +{ + theMutexPtr = NdbMutex_Create(); + theConditionPtr = NdbCondition_Create(); +} + +template <class T> MemoryChannel<T>::~MemoryChannel( ) +{ + NdbMutex_Destroy(theMutexPtr); + NdbCondition_Destroy(theConditionPtr); + delete [] theChannel; +} + +template <class T> void MemoryChannel<T>::writeChannel( T *t) +{ + + NdbMutex_Lock(theMutexPtr); + REQUIRE(!full(theWriteIndex, theReadIndex), "Memory Channel Full"); + REQUIRE(theChannel != NULL, "Memory Channel Full"); + theChannel[theWriteIndex]= t; + ++theWriteIndex; + NdbMutex_Unlock(theMutexPtr); + NdbCondition_Signal(theConditionPtr); +} + + +template <class T> T* MemoryChannel<T>::readChannel() +{ + T* tmp; + + NdbMutex_Lock(theMutexPtr); + while ( empty(theWriteIndex, theReadIndex) ) + { + NdbCondition_Wait(theConditionPtr, + theMutexPtr); + } + + tmp= theChannel[theReadIndex]; + ++theReadIndex; + NdbMutex_Unlock(theMutexPtr); + return tmp; +} + +template <class T> T* MemoryChannel<T>::tryReadChannel() +{ + T* tmp= 0; + NdbMutex_Lock(theMutexPtr); + NdbCondition_WaitTimeout(theConditionPtr, + theMutexPtr, 0); + if ( !empty(theWriteIndex, theReadIndex) ) + { + tmp= theChannel[theReadIndex]; + ++theReadIndex; + } + NdbMutex_Unlock(theMutexPtr); + return tmp; +} + +#endif + +#endif // MemoryChannel_H + diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp new file mode 100644 index 00000000000..9f70efcadf7 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp @@ -0,0 +1,205 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef MemoryChannelOSE_H +#define MemoryChannelOSE_H + +//=========================================================================== +// +// .DESCRIPTION +// Pointer based communication channel for communication between two +// thread. It sends the pointer to the other signal via an OSE signal +// +// .TYPICAL USE: +// to communicate between threads. +// +// .EXAMPLE: +// See AsyncFile.C +//=========================================================================== +// +// +// MemoryChannel( int size= 256); +// Constuctor +// Parameters: +// size : is ignored in OSE version +// +// void operator ++ (); +// increments the index with one, if size is reached it is set to zero +// +// virtual void write( T *t); +// Puts the item in the channel if the channel is full an error is reported. +// Parameters: +// t: pointer to item to put in the channel, after this the item +// is shared with the other thread. +// errors +// AFS_ERROR_CHANNALFULL, channel is full +// +// T* read(); +// Reads a itemn from the channel, if channel is empty it blocks untill +// an item can be read. +// return +// T : item from the channel +// +// T* tryRead(); +// Reads a item from the channel, if channel is empty it returns zero. +// return +// T : item from the channel or zero if channel is empty. +// + +#include <ose.h> +#include "ErrorHandlingMacros.hpp" +#include "Error.hpp" +#include "NdbMutex.h" +#include "NdbCondition.h" + +#include <assert.h> + + + + +template <class T> +class MemoryChannel +{ +public: + MemoryChannel( int size= 256); + virtual ~MemoryChannel( ); + + virtual void writeChannel( T *t); + T* readChannel(); + T* tryReadChannel(); + +private: + PROCESS theReceiverPid; +}; + +template <class T> class MemoryChannelMultipleWriter:public MemoryChannel<T> +{ +public: + MemoryChannelMultipleWriter( int size= 256); + ~MemoryChannelMultipleWriter( ); + void writeChannel( T *t); + +private: +}; + + +#define MEMCHANNEL_SIGBASE 5643 + +#define MEMCHANNEL_SIGNAL (MEMCHANNEL_SIGBASE + 1) /* !-SIGNO(struct MemChannelSignal)-! */ + + +struct MemChannelSignal +{ + SIGSELECT sigNo; + void* ptr; +}; + +union SIGNAL +{ + SIGSELECT sigNo; + struct MemChannelSignal memChanSig; +}; + +template <class T> MemoryChannel<T>::MemoryChannel( int size ) +{ + // Default receiver for this channel is the creating process + theReceiverPid = current_process(); +} + +template <class T> MemoryChannel<T>::~MemoryChannel( ) +{ +} + +template <class T> void MemoryChannel<T>::writeChannel( T *t) +{ + union SIGNAL* sig; + + sig = alloc(sizeof(struct MemChannelSignal), MEMCHANNEL_SIGNAL); + ((struct MemChannelSignal*)sig)->ptr = t; + send(&sig, theReceiverPid); +} + + +template <class T> T* MemoryChannel<T>::readChannel() +{ + T* tmp; + + static const SIGSELECT sel_mem[] = {1, MEMCHANNEL_SIGNAL}; + union SIGNAL* sig; + + tmp = NULL; /* Default value */ + + sig = receive((SIGSELECT*)sel_mem); + if (sig != NIL){ + if (sig->sigNo == MEMCHANNEL_SIGNAL){ + tmp = (T*)(((struct MemChannelSignal*)sig)->ptr); + }else{ + assert(1==0); + } + free_buf(&sig); + } + + return tmp; +} + +template <class T> T* MemoryChannel<T>::tryReadChannel() +{ + T* tmp; + + static const SIGSELECT sel_mem[] = {1, MEMCHANNEL_SIGNAL}; + union SIGNAL* sig; + + tmp = NULL; /* Default value */ + + sig = receive_w_tmo(0, (SIGSELECT*)sel_mem); + if (sig != NIL){ + if (sig->sigNo == MEMCHANNEL_SIGNAL){ + tmp = (T*)(((struct MemChannelSignal*)sig)->ptr); + }else{ + assert(1==0); + } + free_buf(&sig); + } + + return tmp; +} + + +#endif // MemoryChannel_H + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile new file mode 100644 index 00000000000..68f71bfc4cd --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile @@ -0,0 +1,13 @@ +include .defs.mk + +TYPE := kernel + +BIN_TARGET := mctest +BIN_TARGET_ARCHIVES := portlib + +SOURCES = MemoryChannelTest.cpp + +CFLAGS_MemoryChannelTest.cpp = -I../ + +include $(NDB_TOP)/Epilogue.mk + diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp new file mode 100644 index 00000000000..aeab9f7828d --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp @@ -0,0 +1,197 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "MemoryChannel.hpp" +#include "NdbThread.h" +#include "NdbSleep.h" +#include "NdbOut.hpp" +#include "NdbMain.h" + + + +MemoryChannel<int>* theMemoryChannel; + + +extern "C" void* runProducer(void*arg) +{ + // The producer will items into the MemoryChannel + int count = *(int*)arg; + int* p; + int i = 0; + while (i <= count) + { + p = new int(i); + ndbout << "P: " << *p << endl; + theMemoryChannel->writeChannel(p); + if (i%5==0) + NdbSleep_MilliSleep(i); + i++; + } + NdbThread_Exit(0); + return NULL; +} + +extern "C" void* runConsumer(void* arg) +{ + // The producer will read items from MemoryChannel and print on screen + int count = *(int*)arg; + int* p; + int i = 0; + while (i < count) + { + p = theMemoryChannel->readChannel(); + ndbout << "C: " << *p << endl; + i = *p; + delete p; + + } + NdbThread_Exit(0); + return NULL; +} + + + +class ArgStruct +{ +public: + ArgStruct(int _items, int _no){ + items=_items; + no=_no; + }; + int items; + int no; +}; + +MemoryChannelMultipleWriter<ArgStruct>* theMemoryChannel2; + +extern "C" void* runProducer2(void*arg) +{ + // The producer will items into the MemoryChannel + ArgStruct* pArg = (ArgStruct*)arg; + int count = pArg->items; + ArgStruct* p; + int i = 0; + while (i < count) + { + p = new ArgStruct(i, pArg->no); + ndbout << "P"<<pArg->no<<": " << i << endl; + theMemoryChannel2->writeChannel(p); + NdbSleep_MilliSleep(i); + i++; + } + NdbThread_Exit(0); + return NULL; +} + +extern "C" void* runConsumer2(void* arg) +{ + // The producer will read items from MemoryChannel and print on screen + ArgStruct* pArg = (ArgStruct*)arg; + int count = pArg->items * pArg->no; + ArgStruct* p; + int i = 0; + while (i < count) + { + p = theMemoryChannel2->readChannel(); + ndbout << "C: "<< p->no << ", " << p->items << endl; + i++; + delete p; + } + ndbout << "Consumer2: " << count << " received" << endl; + NdbThread_Exit(0); + return NULL; +} + + + + +//#if defined MEMORYCHANNELTEST + +//int main(int argc, char **argv) +NDB_COMMAND(mctest, "mctest", "mctest", "Test the memory channel used in Ndb", 32768) +{ + + ndbout << "==== testing MemoryChannel ====" << endl; + + theMemoryChannel = new MemoryChannel<int>; + theMemoryChannel2 = new MemoryChannelMultipleWriter<ArgStruct>; + + NdbThread* consumerThread; + NdbThread* producerThread; + + NdbThread_SetConcurrencyLevel(2); + + int numItems = 100; + producerThread = NdbThread_Create(runProducer, + (void**)&numItems, + 4096, + (char*)"producer"); + + consumerThread = NdbThread_Create(runConsumer, + (void**)&numItems, + 4096, + (char*)"consumer"); + + + void *status; + NdbThread_WaitFor(consumerThread, &status); + NdbThread_WaitFor(producerThread, &status); + + ndbout << "==== testing MemoryChannelMultipleWriter ====" << endl; +#define NUM_THREADS2 5 + NdbThread_SetConcurrencyLevel(NUM_THREADS2+2); + NdbThread* producerThreads[NUM_THREADS2]; + + ArgStruct *pArg; + for (int j = 0; j < NUM_THREADS2; j++) + { + char buf[25]; + sprintf((char*)&buf, "producer%d", j); + pArg = new ArgStruct(numItems, j); + producerThreads[j] = NdbThread_Create(runProducer2, + (void**)pArg, + 4096, + (char*)&buf); + } + + pArg = new ArgStruct(numItems, NUM_THREADS2); + consumerThread = NdbThread_Create(runConsumer2, + (void**)pArg, + 4096, + (char*)"consumer"); + + + NdbThread_WaitFor(consumerThread, &status); + for (int j = 0; j < NUM_THREADS2; j++) + { + NdbThread_WaitFor(producerThreads[j], &status); + } + + + return 0; + +} + +void ErrorReporter::handleError(ErrorCategory type, int messageID, + const char* problemData, const char* objRef, + NdbShutdownType nst) +{ + + ndbout << "ErrorReporter::handleError activated" << endl; + exit(1); +} + +//#endif diff --git a/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp b/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp new file mode 100644 index 00000000000..8992a2104e9 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp @@ -0,0 +1,1008 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <limits.h> +#include <errno.h> + +#include "Ndbfs.hpp" +#include "AsyncFile.hpp" +#include "Filename.hpp" +#include "Error.hpp" + +#include <signaldata/FsOpenReq.hpp> +#include <signaldata/FsCloseReq.hpp> +#include <signaldata/FsReadWriteReq.hpp> +#include <signaldata/FsAppendReq.hpp> +#include <signaldata/FsRemoveReq.hpp> +#include <signaldata/FsConf.hpp> +#include <signaldata/FsRef.hpp> +#include <signaldata/NdbfsContinueB.hpp> +#include <signaldata/DumpStateOrd.hpp> + +#include <RefConvert.hpp> +#include <NdbSleep.h> +#include <NdbOut.hpp> +#include <Configuration.hpp> + +#define DEBUG(x) { ndbout << "FS::" << x << endl; } + +inline +int pageSize( const NewVARIABLE* baseAddrRef ) +{ + int log_psize; + int log_qsize = baseAddrRef->bits.q; + int log_vsize = baseAddrRef->bits.v; + if (log_vsize < 3) + log_vsize = 3; + log_psize = log_qsize + log_vsize - 3; + return (1 << log_psize); +} + + +Ndbfs::Ndbfs(const Configuration & conf) : + SimulatedBlock(NDBFS, conf), + scanningInProgress(false), + theLastId(0), + m_maxOpenedFiles(0) +{ + theFileSystemPath = conf.fileSystemPath(); + theRequestPool = new Pool<Request>; + + const Properties * p = conf.getOwnProperties(); + ndbrequire(p != 0); + + ndbrequire(p->get("MaxNoOfOpenFiles", &m_maxFiles)); + + // Create idle AsyncFiles + Uint32 noIdleFiles = 16; + for (Uint32 i = 0; i < noIdleFiles; i++){ + theIdleFiles.push_back(createAsyncFile()); + } + + BLOCK_CONSTRUCTOR(Ndbfs); + + // Set received signals + addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD); + addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR); + addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ); + addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ); + addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ); + addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ); + addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ); + addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB); + addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ); + addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ); + // Set send signals +} + +Ndbfs::~Ndbfs() +{ + // Delete all files + // AsyncFile destuctor will take care of deleting + // the thread it has created + for (unsigned i = 0; i < theFiles.size(); i++){ + AsyncFile* file = theFiles[i]; + delete file; + theFiles[i] = NULL; + }//for + theFiles.clear(); + + delete theRequestPool; +} + +/* Received a restart signal. + * Answer it like any other block + * PR0 : StartCase + * DR0 : StartPhase + * DR1 : ? + * DR2 : ? + * DR3 : ? + * DR4 : ? + * DR5 : SignalKey + */ +void +Ndbfs::execSTTOR(Signal* signal) +{ + jamEntry(); + + if(signal->theData[1] == 0){ // StartPhase 0 + jam(); + cownref = NDBFS_REF; + // close all open files + ndbrequire(theOpenFiles.size() == 0); + + scanningInProgress = false; + + signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; + sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1); + + signal->theData[3] = 255; + sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB); + return; + } + ndbrequire(0); +} + +int +Ndbfs::forward( AsyncFile * file, Request* request) +{ + jam(); + file->execute(request); + return 1; +} + +void +Ndbfs::execFSOPENREQ(Signal* signal) +{ + jamEntry(); + const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0]; + const BlockReference userRef = fsOpenReq->userReference; + AsyncFile* file = getIdleFile(); + ndbrequire(file != NULL); + ndbrequire(signal->getLength() == FsOpenReq::SignalLength) + file->theFileName.set( userRef, fsOpenReq->fileNumber); + file->reportTo(&theFromThreads); + + Request* request = theRequestPool->get(); + request->action = Request::open; + request->error = 0; + request->par.open.flags = fsOpenReq->fileFlags; + request->set(userRef, fsOpenReq->userPointer, newId() ); + request->file = file; + request->theTrace = signal->getTrace(); + + ndbrequire(forward(file, request)); +} + +void +Ndbfs::execFSREMOVEREQ(Signal* signal) +{ + jamEntry(); + const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr(); + const BlockReference userRef = req->userReference; + AsyncFile* file = getIdleFile(); + ndbrequire(file != NULL); + + file->theFileName.set( userRef, req->fileNumber, req->directory); + file->reportTo(&theFromThreads); + + Request* request = theRequestPool->get(); + request->action = Request::rmrf; + request->par.rmrf.directory = req->directory; + request->par.rmrf.own_directory = req->ownDirectory; + request->error = 0; + request->set(userRef, req->userPointer, newId() ); + request->file = file; + request->theTrace = signal->getTrace(); + + ndbrequire(forward(file, request)); +} + +/* + * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1 + * remove file + */ +void +Ndbfs::execFSCLOSEREQ(Signal * signal) +{ + jamEntry(); + const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0]; + const BlockReference userRef = fsCloseReq->userReference; + const Uint16 filePointer = (Uint16)fsCloseReq->filePointer; + const UintR userPointer = fsCloseReq->userPointer; + + AsyncFile* openFile = theOpenFiles.find(filePointer); + if (openFile == NULL) { + // The file was not open, send error back to sender + jam(); + // Initialise FsRef signal + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); + fsRef->osErrorCode = ~0; // Indicate local error + sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB); + return; + } + + Request *request = theRequestPool->get(); + if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) { + jam(); + request->action = Request::closeRemove; + } else { + jam(); + request->action = Request::close; + } + request->set(userRef, fsCloseReq->userPointer, filePointer); + request->file = openFile; + request->error = 0; + request->theTrace = signal->getTrace(); + + ndbrequire(forward(openFile, request)); +} + +void +Ndbfs::readWriteRequest(int action, Signal * signal) +{ + const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0]; + Uint16 filePointer = (Uint16)fsRWReq->filePointer; + const UintR userPointer = fsRWReq->userPointer; + const BlockReference userRef = fsRWReq->userReference; + const BlockNumber blockNumber = refToBlock(userRef); + + AsyncFile* openFile = theOpenFiles.find(filePointer); + + const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex]; + unsigned int tPageSize; + unsigned int tClusterSize; + unsigned int tNRR; + unsigned int tPageOffset; + char* tWA; + FsRef::NdbfsErrorCodeType errorCode; + + Request *request = theRequestPool->get(); + request->error = 0; + request->set(userRef, userPointer, filePointer); + request->file = openFile; + request->action = (Request::Action) action; + request->theTrace = signal->getTrace(); + + if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed + jam(); + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + + if (fsRWReq->varIndex >= getBatSize(blockNumber)) { + jam();// Ensure that a valid variable is used + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + if (myBaseAddrRef == NULL) { + jam(); // Ensure that a valid variable is used + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + if (openFile == NULL) { + jam(); //file not open + errorCode = FsRef::fsErrFileDoesNotExist; + goto error; + } + tPageSize = pageSize(myBaseAddrRef); + tClusterSize = myBaseAddrRef->ClusterSize; + tNRR = myBaseAddrRef->nrr; + tWA = (char*)myBaseAddrRef->WA; + + switch (fsRWReq->getFormatFlag(fsRWReq->operationFlag)) { + + // List of memory and file pages pairs + case FsReadWriteReq::fsFormatListOfPairs: { + jam(); + for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { + jam(); + const Uint32 varIndex = fsRWReq->data.listOfPair[i].varIndex; + const Uint32 fileOffset = fsRWReq->data.listOfPair[i].fileOffset; + if (varIndex >= tNRR) { + jam(); + errorCode = FsRef::fsErrInvalidParameters; + goto error; + }//if + request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; + request->par.readWrite.pages[i].size = tPageSize; + request->par.readWrite.pages[i].offset = fileOffset * tPageSize; + }//for + request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; + break; + }//case + + // Range of memory page with one file page + case FsReadWriteReq::fsFormatArrayOfPages: { + if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) { + jam(); + errorCode = FsRef::fsErrInvalidParameters; + goto error; + }//if + const Uint32 varIndex = fsRWReq->data.arrayOfPages.varIndex; + const Uint32 fileOffset = fsRWReq->data.arrayOfPages.fileOffset; + + request->par.readWrite.pages[0].offset = fileOffset * tPageSize; + request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages; + request->par.readWrite.numberOfPages = 1; + request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize]; + break; + }//case + + // List of memory pages followed by one file page + case FsReadWriteReq::fsFormatListOfMemPages: { + + tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages]; + tPageOffset *= tPageSize; + + for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) { + jam(); + Uint32 varIndex = fsRWReq->data.listOfMemPages.varIndex[i]; + + if (varIndex >= tNRR) { + jam(); + errorCode = FsRef::fsErrInvalidParameters; + goto error; + }//if + request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize]; + request->par.readWrite.pages[i].size = tPageSize; + request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize); + }//for + request->par.readWrite.numberOfPages = fsRWReq->numberOfPages; + break; + // make it a writev or readv + }//case + + default: { + jam(); + errorCode = FsRef::fsErrInvalidParameters; + goto error; + }//default + + }//switch + + ndbrequire(forward(openFile, request)); + return; + +error: + theRequestPool->put(request); + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->setErrorCode(fsRef->errorCode, errorCode); + fsRef->osErrorCode = ~0; // Indicate local error + switch (action) { + case Request:: write: + case Request:: writeSync: { + jam(); + sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB); + break; + }//case + case Request:: read: { + jam(); + sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB); + }//case + }//switch + return; +} + +/* + PR0: File Pointer , theData[0] + DR0: User reference, theData[1] + DR1: User Pointer, etc. + DR2: Flag + DR3: Var number + DR4: amount of pages + DR5->: Memory Page id and File page id according to Flag +*/ +void +Ndbfs::execFSWRITEREQ(Signal* signal) +{ + jamEntry(); + const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0]; + + if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){ + jam(); + readWriteRequest( Request::writeSync, signal ); + } else { + jam(); + readWriteRequest( Request::write, signal ); + } +} + +/* + PR0: File Pointer + DR0: User reference + DR1: User Pointer + DR2: Flag + DR3: Var number + DR4: amount of pages + DR5->: Memory Page id and File page id according to Flag +*/ +void +Ndbfs::execFSREADREQ(Signal* signal) +{ + jamEntry(); + readWriteRequest( Request::read, signal ); +} + +/* + * PR0: File Pointer DR0: User reference DR1: User Pointer + */ +void +Ndbfs::execFSSYNCREQ(Signal * signal) +{ + jamEntry(); + Uint16 filePointer = (Uint16)signal->theData[0]; + BlockReference userRef = signal->theData[1]; + const UintR userPointer = signal->theData[2]; + AsyncFile* openFile = theOpenFiles.find(filePointer); + + if (openFile == NULL) { + jam(); //file not open + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist); + fsRef->osErrorCode = ~0; // Indicate local error + sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB); + return; + } + + Request *request = theRequestPool->get(); + request->error = 0; + request->action = Request::sync; + request->set(userRef, userPointer, filePointer); + request->file = openFile; + request->theTrace = signal->getTrace(); + + ndbrequire(forward(openFile,request)); +} + +void +Ndbfs::execFSAPPENDREQ(Signal * signal) +{ + const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0]; + const Uint16 filePointer = (Uint16)fsReq->filePointer; + const UintR userPointer = fsReq->userPointer; + const BlockReference userRef = fsReq->userReference; + const BlockNumber blockNumber = refToBlock(userRef); + + FsRef::NdbfsErrorCodeType errorCode; + + AsyncFile* openFile = theOpenFiles.find(filePointer); + const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex]; + + const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA; + const Uint32 tSz = myBaseAddrRef->nrr; + const Uint32 offset = fsReq->offset; + const Uint32 size = fsReq->size; + Request *request = theRequestPool->get(); + + if (openFile == NULL) { + jam(); + errorCode = FsRef::fsErrFileDoesNotExist; + goto error; + } + + if (myBaseAddrRef == NULL) { + jam(); // Ensure that a valid variable is used + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + + if (fsReq->varIndex >= getBatSize(blockNumber)) { + jam();// Ensure that a valid variable is used + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + + if(offset + size > tSz){ + jam(); // Ensure that a valid variable is used + errorCode = FsRef::fsErrInvalidParameters; + goto error; + } + + request->error = 0; + request->set(userRef, userPointer, filePointer); + request->file = openFile; + request->action = Request::append; + request->theTrace = signal->getTrace(); + + request->par.append.buf = (const char *)(tWA + offset); + request->par.append.size = size << 2; + + ndbrequire(forward(openFile, request)); + return; + +error: + jam(); + theRequestPool->put(request); + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->setErrorCode(fsRef->errorCode, errorCode); + fsRef->osErrorCode = ~0; // Indicate local error + + jam(); + sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB); + return; +} + +Uint16 +Ndbfs::newId() +{ + // finds a new key, eg a new filepointer + for (int i = 1; i < SHRT_MAX; i++) + { + if (theLastId == SHRT_MAX) { + jam(); + theLastId = 1; + } else { + jam(); + theLastId++; + } + + if(theOpenFiles.find(theLastId) == NULL) { + jam(); + return theLastId; + } + } + ndbrequire(1 == 0); + // The program will not reach this point + return 0; +} + +AsyncFile* +Ndbfs::createAsyncFile(){ + + // Check limit of open files + if (theFiles.size()+1 == m_maxFiles) { + // Print info about all open files + for (unsigned i = 0; i < theFiles.size(); i++){ + AsyncFile* file = theFiles[i]; + ndbout_c("%2d (0x%x): %s", i, file, file->isOpen()?"OPEN":"CLOSED"); + } + ERROR_SET(fatal, AFS_ERROR_MAXOPEN,""," Ndbfs::createAsyncFile"); + } + + AsyncFile* file = new AsyncFile; + file->doStart(theFileSystemPath); + + // Put the file in list of all files + theFiles.push_back(file); + +#ifdef VM_TRACE + infoEvent("NDBFS: Created new file thread %d", theFiles.size()); +#endif + + return file; +} + +AsyncFile* +Ndbfs::getIdleFile(){ + AsyncFile* file; + if (theIdleFiles.size() > 0){ + file = theIdleFiles[0]; + theIdleFiles.erase(0); + } else { + file = createAsyncFile(); + } + return file; +} + + + +void +Ndbfs::report(Request * request, Signal* signal) +{ + const Uint32 orgTrace = signal->getTrace(); + signal->setTrace(request->theTrace); + const BlockReference ref = request->theUserReference; + if (request->error) { + jam(); + // Initialise FsRef signal + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = request->theUserPointer; + fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error)); + fsRef->osErrorCode = request->error; + + switch (request->action) { + case Request:: open: { + jam(); + // Put the file back in idle files list + theIdleFiles.push_back(request->file); + sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request:: closeRemove: + case Request:: close: { + jam(); + sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request:: writeSync: + case Request:: writevSync: + case Request:: write: + case Request:: writev: { + jam(); + sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request:: read: + case Request:: readv: { + jam(); + sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request:: sync: { + jam(); + sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request::append: { + jam(); + sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB); + break; + } + case Request::rmrf: { + jam(); + // Put the file back in idle files list + theIdleFiles.push_back(request->file); + sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB); + break; + } + + case Request:: end: { + // Report nothing + break; + } + }//switch + } else { + jam(); + FsConf * const fsConf = (FsConf *)&signal->theData[0]; + fsConf->userPointer = request->theUserPointer; + switch (request->action) { + case Request:: open: { + jam(); + theOpenFiles.insert(request->file, request->theFilePointer); + + // Keep track on max number of opened files + if (theOpenFiles.size() > m_maxOpenedFiles) + m_maxOpenedFiles = theOpenFiles.size(); + + fsConf->filePointer = request->theFilePointer; + sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB); + break; + } + case Request:: closeRemove: + case Request:: close: { + jam(); + // removes the file from OpenFiles list + theOpenFiles.erase(request->theFilePointer); + // Put the file in idle files list + theIdleFiles.push_back(request->file); + sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB); + break; + } + case Request:: writeSync: + case Request:: writevSync: + case Request:: write: + case Request:: writev: { + jam(); + sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB); + break; + } + case Request:: read: + case Request:: readv: { + jam(); + sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB); + break; + } + case Request:: sync: { + jam(); + sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB); + break; + }//case + case Request::append: { + jam(); + signal->theData[1] = request->par.append.size; + sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB); + break; + } + case Request::rmrf: { + jam(); + // Put the file in idle files list + theIdleFiles.push_back(request->file); + sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB); + break; + } + case Request:: end: { + // Report nothing + break; + } + } + }//if + signal->setTrace(orgTrace); +} + + +bool +Ndbfs::scanIPC(Signal* signal) +{ + Request* request = theFromThreads.tryReadChannel(); + jam(); + if (request) { + jam(); + report(request, signal); + theRequestPool->put(request); + return &request; + } + return false; +} + +#if defined NDB_WIN32 +int Ndbfs::translateErrno(int aErrno) +{ + switch (aErrno) + { + //permission denied + case ERROR_ACCESS_DENIED: + + return FsRef::fsErrPermissionDenied; + //temporary not accessible + case ERROR_PATH_BUSY: + case ERROR_NO_MORE_SEARCH_HANDLES: + + return FsRef::fsErrTemporaryNotAccessible; + //no space left on device + case ERROR_HANDLE_DISK_FULL: + case ERROR_DISK_FULL: + + return FsRef::fsErrNoSpaceLeftOnDevice; + //none valid parameters + case ERROR_INVALID_HANDLE: + case ERROR_INVALID_DRIVE: + case ERROR_INVALID_ACCESS: + case ERROR_HANDLE_EOF: + case ERROR_BUFFER_OVERFLOW: + + return FsRef::fsErrInvalidParameters; + //environment error + case ERROR_CRC: + case ERROR_ARENA_TRASHED: + case ERROR_BAD_ENVIRONMENT: + case ERROR_INVALID_BLOCK: + case ERROR_WRITE_FAULT: + case ERROR_READ_FAULT: + case ERROR_OPEN_FAILED: + + return FsRef::fsErrEnvironmentError; + + //no more process resources + case ERROR_TOO_MANY_OPEN_FILES: + case ERROR_NOT_ENOUGH_MEMORY: + case ERROR_OUTOFMEMORY: + return FsRef::fsErrNoMoreResources; + //no file + case ERROR_FILE_NOT_FOUND: + return FsRef::fsErrFileDoesNotExist; + + case ERR_ReadUnderflow: + return FsRef::fsErrReadUnderflow; + + default: + return FsRef::fsErrUnknown; + } +} +#elif defined NDB_OSE || defined NDB_SOFTOSE +int Ndbfs::translateErrno(int aErrno) +{ + switch (aErrno) + { + //permission denied + case EACCES: + case EROFS: + case ENXIO: + return FsRef::fsErrPermissionDenied; + //temporary not accessible + case EAGAIN: + case ETIMEDOUT: + case ENOLCK: + return FsRef::fsErrTemporaryNotAccessible; + //no space left on device + case ENFILE: + case EDQUOT: + case ENOSPC: + return FsRef::fsErrNoSpaceLeftOnDevice; + //none valid parameters + case EINVAL: + case EFBIG: + case EBADF: + case ENAMETOOLONG: + case EFAULT: + case EISDIR: + return FsRef::fsErrInvalidParameters; + //environment error + case EMLINK: + case ELOOP: + return FsRef::fsErrEnvironmentError; + + //no more process resources + case EMFILE: + case ENOMEM: + return FsRef::fsErrNoMoreResources; + //no file + case ENOENT: + return FsRef::fsErrFileDoesNotExist; + + case ERR_ReadUnderflow: + return FsRef::fsErrReadUnderflow; + + default: + return FsRef::fsErrUnknown; + } +} +#else +int Ndbfs::translateErrno(int aErrno) +{ + switch (aErrno) + { + //permission denied + case EACCES: + case EROFS: + case ENXIO: + return FsRef::fsErrPermissionDenied; + //temporary not accessible + case EAGAIN: + case ETIMEDOUT: + case ENOLCK: + case EINTR: + case EIO: + return FsRef::fsErrTemporaryNotAccessible; + //no space left on device + case ENFILE: + case EDQUOT: +#ifndef NDB_MACOSX + case ENOSR: +#endif + case ENOSPC: + case EFBIG: + return FsRef::fsErrNoSpaceLeftOnDevice; + //none valid parameters + case EINVAL: + case EBADF: + case ENAMETOOLONG: + case EFAULT: + case EISDIR: + case ENOTDIR: + case EEXIST: + case ETXTBSY: + return FsRef::fsErrInvalidParameters; + //environment error + case ELOOP: +#ifndef NDB_MACOSX + case ENOLINK: + case EMULTIHOP: +#endif +#ifndef NDB_LINUX + case EOPNOTSUPP: + case ESPIPE: +#endif + case EPIPE: + return FsRef::fsErrEnvironmentError; + + //no more process resources + case EMFILE: + case ENOMEM: + return FsRef::fsErrNoMoreResources; + //no file + case ENOENT: + return FsRef::fsErrFileDoesNotExist; + + case ERR_ReadUnderflow: + return FsRef::fsErrReadUnderflow; + + default: + return FsRef::fsErrUnknown; + } +} +#endif + + + +void +Ndbfs::execCONTINUEB(Signal* signal) +{ + jamEntry(); + if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) { + jam(); + + // Also send CONTINUEB to ourself in order to scan for + // incoming answers from AsyncFile on MemoryChannel theFromThreads + signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY; + sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1); + if (scanningInProgress == true) { + jam(); + return; + } + } + if (scanIPC(signal)) { + jam(); + scanningInProgress = true; + signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY; + sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB); + } else { + jam(); + scanningInProgress = false; + } + return; +} + +bool Global_useO_SYNC = false; +bool Global_useO_DIRECT = false; +bool Global_unlinkO_CREAT = false; +Uint32 Global_syncFreq = 1024 * 1024; + +void +Ndbfs::execDUMP_STATE_ORD(Signal* signal) +{ + if(signal->theData[0] == 19){ + if(signal->length() > 1){ + Global_useO_SYNC = signal->theData[1]; + } + if(signal->length() > 2){ + Global_syncFreq = signal->theData[2] * 1024 * 1024; + } + if(signal->length() > 3){ + Global_unlinkO_CREAT = signal->theData[3]; + } + if(signal->length() > 4){ + Global_useO_DIRECT = signal->theData[4]; + } + ndbout_c("useO_SYNC = %d syncFreq = %d unlinkO_CREATE = %d O_DIRECT = %d", + Global_useO_SYNC, + Global_syncFreq, + Global_unlinkO_CREAT, + Global_useO_DIRECT); + return; + } + if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){ + infoEvent("NDBFS: Files: %d Open files: %d", + theFiles.size(), + theOpenFiles.size()); + infoEvent(" Idle files: %d Max opened files: %d", + theIdleFiles.size(), + m_maxOpenedFiles); + infoEvent(" Max files: %d", + m_maxFiles); + infoEvent(" Requests: %d", + theRequestPool->size()); + + return; + } + if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){ + infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size()); + + for (unsigned i = 0; i < theOpenFiles.size(); i++){ + AsyncFile* file = theOpenFiles.getFile(i); + infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str()); + } + return; + } + if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){ + infoEvent("NDBFS: Dump all files: %d", theFiles.size()); + + for (unsigned i = 0; i < theFiles.size(); i++){ + AsyncFile* file = theFiles[i]; + infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); + } + return; + } + if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){ + infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size()); + + for (unsigned i = 0; i < theIdleFiles.size(); i++){ + AsyncFile* file = theIdleFiles[i]; + infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED"); + } + return; + } +}//Ndbfs::execDUMP_STATE_ORD() + + + +BLOCK_FUNCTIONS(Ndbfs); + diff --git a/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp b/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp new file mode 100644 index 00000000000..080196a9ea5 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp @@ -0,0 +1,126 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef SIMBLOCKASYNCFILESYSTEM_H +#define SIMBLOCKASYNCFILESYSTEM_H + +#include <pc.hpp> +#include <SimulatedBlock.hpp> +#include "Pool.hpp" +#include "AsyncFile.hpp" +#include "OpenFiles.hpp" + + + +// Because one NDB Signal request can result in multiple requests to +// AsyncFile one class must be made responsible to keep track +// of all out standing request and when all are finished the result +// must be reported to the sending block. + + +class Ndbfs : public SimulatedBlock +{ +public: + Ndbfs(const class Configuration & conf); + virtual ~Ndbfs(); + +protected: + BLOCK_DEFINES(Ndbfs); + + // The signal processing functions + void execDUMP_STATE_ORD(Signal* signal); + void execFSOPENREQ(Signal* signal); + void execFSCLOSEREQ(Signal* signal); + void execFSWRITEREQ(Signal* signal); + void execFSREADREQ(Signal* signal); + void execFSSYNCREQ(Signal* signal); + void execFSAPPENDREQ(Signal* signal); + void execFSREMOVEREQ(Signal* signal); + void execSTTOR(Signal* signal); + void execCONTINUEB(Signal* signal); + + bool scanningInProgress; + Uint16 newId(); + +private: + int forward(AsyncFile *file, Request* Request); + void report(Request* request, Signal* signal); + bool scanIPC(Signal* signal); + + // Declared but not defined + Ndbfs(Ndbfs & ); + void operator = (Ndbfs &); + + // Used for uniqe number generation + Uint16 theLastId; + BlockReference cownref; + + // Communication from files + MemoryChannel<Request> theFromThreads; + + Pool<Request>* theRequestPool; + + AsyncFile* createAsyncFile(); + AsyncFile* getIdleFile(); + + Vector<AsyncFile*> theFiles; // List all created AsyncFiles + Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles + OpenFiles theOpenFiles; // List of open AsyncFiles + const char * theFileSystemPath; + + // Statistics variables + Uint32 m_maxOpenedFiles; + + // Limit for max number of AsyncFiles created + Uint32 m_maxFiles; + + void readWriteRequest( int action, Signal * signal ); + + static int translateErrno(int aErrno); +}; + +class VoidFs : public SimulatedBlock +{ +public: + VoidFs(const class Configuration & conf); + virtual ~VoidFs(); + +protected: + BLOCK_DEFINES(VoidFs); + + // The signal processing functions + void execDUMP_STATE_ORD(Signal* signal); + void execFSOPENREQ(Signal* signal); + void execFSCLOSEREQ(Signal* signal); + void execFSWRITEREQ(Signal* signal); + void execFSREADREQ(Signal* signal); + void execFSSYNCREQ(Signal* signal); + void execFSAPPENDREQ(Signal* signal); + void execFSREMOVEREQ(Signal* signal); + void execSTTOR(Signal* signal); + +private: + // Declared but not defined + VoidFs(VoidFs & ); + void operator = (VoidFs &); + + // Used for uniqe number generation + Uint32 c_maxFileNo; +}; + +#endif + + diff --git a/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp b/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp new file mode 100644 index 00000000000..b944bb5485b --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp @@ -0,0 +1,114 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef OPENFILES_H +#define OPENFILES_H + +#include <Vector.hpp> + +class OpenFiles +{ +public: + OpenFiles(){ } + + /* Get a pointer to the file with id */ + AsyncFile* find(Uint16 id); + /* Insert file with id */ + bool insert(AsyncFile* file, Uint16 id); + /* Erase file with id */ + bool erase(Uint16 id); + /* Get number of open files */ + unsigned size(); + + Uint16 getId(unsigned i); + AsyncFile* getFile(unsigned i); + + +private: + + class OpenFileItem { + public: + OpenFileItem(): m_file(NULL), m_id(0){}; + + AsyncFile* m_file; + Uint16 m_id; + }; + + Vector<OpenFileItem> m_files; +}; + + +//***************************************************************************** +inline AsyncFile* OpenFiles::find(Uint16 id){ + for (unsigned i = 0; i < m_files.size(); i++){ + if (m_files[i].m_id == id){ + return m_files[i].m_file; + } + } + return NULL; +} + +//***************************************************************************** +inline bool OpenFiles::erase(Uint16 id){ + for (unsigned i = 0; i < m_files.size(); i++){ + if (m_files[i].m_id == id){ + m_files.erase(i); + return true; + } + } + // Item was not found in list + return false; +} + + +//***************************************************************************** +inline bool OpenFiles::insert(AsyncFile* file, Uint16 id){ + // Check if file has already been opened + for (unsigned i = 0; i < m_files.size(); i++){ + if(m_files[i].m_file == NULL) + continue; + + if(strcmp(m_files[i].m_file->theFileName.c_str(), + file->theFileName.c_str()) == 0){ + ERROR_SET(fatal, AFS_ERROR_ALLREADY_OPEN,"","OpenFiles::insert()"); + } + } + + // Insert the file into vector + OpenFileItem openFile; + openFile.m_id = id; + openFile.m_file = file; + m_files.push_back(openFile); + + return true; +} + +//***************************************************************************** +inline Uint16 OpenFiles::getId(unsigned i){ + return m_files[i].m_id; +} + +//***************************************************************************** +inline AsyncFile* OpenFiles::getFile(unsigned i){ + return m_files[i].m_file; +} + +//***************************************************************************** +inline unsigned OpenFiles::size(){ + return m_files.size(); +} + +#endif diff --git a/ndb/src/kernel/blocks/ndbfs/Pool.hpp b/ndb/src/kernel/blocks/ndbfs/Pool.hpp new file mode 100644 index 00000000000..a26fa730727 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/Pool.hpp @@ -0,0 +1,262 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef FOR_LIB_POOL_H +#define FOR_LIB_POOL_H + + +//=========================================================================== +// +// .PUBLIC +// +//=========================================================================== + +//////////////////////////////////////////////////////////////// +// +// enum { defInitSize = 256, defIncSize = 64 }; +// Description: type to store initial and incremental size in. +// +//////////////////////////////////////////////////////////////// +// +// Pool(int anInitSize = defInitSize, int anIncSize = defIncSize); +// Description: +// Constructor. Allocates anInitSize of objects <template argument>. +// When the pool runs out of elements, anIncSize elements are added to the +// pool. (When the pool is not optimized to allocate multiple elements +// more efficient, the anIncSize MUST be set to 1 to get the best +// performance... +// +// Parameters: +// defInitSize: Initial size of the pool (# of elements in the pool) +// defIncSize: # of elements added to the pool when a request to an empty +// pool is made. +// Return value: +// _ +// Errors: +// - +// Asserts: +// _ +// +//////////////////////////////////////////////////////////////// +// +// virtual ~Pool(); +// Description: +// Elements in the pool are all deallocated. +// Parameters: +// _ +// Return value: +// _ +// Errors: +// - +// Asserts: +// theEmptyNodeList==0. No elements are in still in use. +// +//////////////////////////////////////////////////////////////// +// +// T& get(); +// Description: +// get's an element from the Pool. +// Parameters: +// _ +// Return value: +// T& the element extracted from the Pool. (element must be cleared to +// mimick newly created element) +// Errors: +// - +// Asserts: +// _ +// +//////////////////////////////////////////////////////////////// +// +// void put(T& aT); +// Description: +// Returns an element to the pool. +// Parameters: +// aT The element to put back in the pool +// Return value: +// void +// Errors: +// - +// Asserts: +// The pool has "empty" elements, to put element back in... +// +//=========================================================================== +// +// .PRIVATE +// +//=========================================================================== + +//////////////////////////////////////////////////////////////// +// +// void allocate(int aSize); +// Description: +// add aSize elements to the pool +// Parameters: +// aSize: # of elements to add to the pool +// Return value: +// void +// Errors: +// - +// Asserts: +// _ +// +//////////////////////////////////////////////////////////////// +// +// void deallocate(); +// Description: +// frees all elements kept in the pool. +// Parameters: +// _ +// Return value: +// void +// Errors: +// - +// Asserts: +// No elements are "empty" i.e. in use. +// +//=========================================================================== +// +// .PRIVATE +// +//=========================================================================== + +//////////////////////////////////////////////////////////////// +// +// Pool<T>& operator=(const Pool<T>& cp); +// Description: +// Prohibit use of assignement operator. +// Parameters: +// cp +// Return value: +// Pool<T>& +// Asserts: +// _ +// +//////////////////////////////////////////////////////////////// +// +// Pool(const Pool<T>& cp); +// Description: +// Prohibit use of default copy constructor. +// Parameters: +// cp +// Return value: +// _ +// Errors: +// - +// Asserts: +// _ +// +//////////////////////////////////////////////////////////////// +// +// int initSize; +// Description: size of the initial size of the pool +// +//////////////////////////////////////////////////////////////// +// +// int incSize; +// Description: # of elements added to the pool when pool is exhausted. +// +//////////////////////////////////////////////////////////////// +// +// PoolElement<T>* theFullNodeList; +// Description: List to contain all "unused" elements in the pool +// +//////////////////////////////////////////////////////////////// +// +// PoolElement<T>* theEmptyNodeList; +// Description: List to contain all "in use" elements in the pool +// +//------------------------------------------------------------------------- + +template <class T> +class Pool +{ +public: + enum { defInitSize = 256, defIncSize = 64 }; + + Pool(int anInitSize = defInitSize, int anIncSize = defIncSize) : + theIncSize(anIncSize), + theTop(0), + theCurrentSize(0), + theList(0) + { + allocate(anInitSize); + } + + virtual ~Pool(void) + { + for (int i=0; i <theTop ; ++i) + delete theList[i]; + + delete []theList; + } + + T* get(); + void put(T* aT); + + unsigned size(){ return theTop; }; + +protected: + void allocate(int aSize) + { + T** tList = theList; + int i; + theList = new T*[aSize+theCurrentSize]; + REQUIRE(theList != 0, "Allocate in Pool.hpp failed"); + // allocate full list + for (i = 0; i < theTop; i++) { + theList[i] = tList[i]; + } + delete []tList; + for (; (theTop < aSize); theTop++){ + theList[theTop] = (T*)new T; + } + theCurrentSize += aSize; + } + +private: + Pool<T>& operator=(const Pool<T>& cp); + Pool(const Pool<T>& cp); + + int theIncSize; + int theTop; + int theCurrentSize; + + T** theList; +}; + +//****************************************************************************** +template <class T> inline T* Pool<T>::get() +{ + T* tmp; + if( theTop == 0 ) + { + allocate(theIncSize); + } + --theTop; + tmp = theList[theTop]; + return tmp; +} + +// +//****************************************************************************** +template <class T> inline void Pool<T>::put(T* aT) +{ + theList[theTop]= aT; + ++theTop; +} + +#endif diff --git a/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp b/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp new file mode 100644 index 00000000000..d3407e8d4e7 --- /dev/null +++ b/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp @@ -0,0 +1,200 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <limits.h> +#include <errno.h> + +#include "Ndbfs.hpp" +#include "AsyncFile.hpp" +#include "Filename.hpp" +#include "Error.hpp" + +#include <signaldata/FsOpenReq.hpp> +#include <signaldata/FsCloseReq.hpp> +#include <signaldata/FsReadWriteReq.hpp> +#include <signaldata/FsAppendReq.hpp> +#include <signaldata/FsRemoveReq.hpp> +#include <signaldata/FsConf.hpp> +#include <signaldata/FsRef.hpp> +#include <signaldata/NdbfsContinueB.hpp> +#include <signaldata/DumpStateOrd.hpp> + +#include <RefConvert.hpp> +#include <NdbSleep.h> +#include <NdbOut.hpp> +#include <Configuration.hpp> + +#define DEBUG(x) { ndbout << "FS::" << x << endl; } + +VoidFs::VoidFs(const Configuration & conf) : + SimulatedBlock(NDBFS, conf) +{ + BLOCK_CONSTRUCTOR(VoidFs); + + // Set received signals + addRecSignal(GSN_DUMP_STATE_ORD, &VoidFs::execDUMP_STATE_ORD); + addRecSignal(GSN_STTOR, &VoidFs::execSTTOR); + addRecSignal(GSN_FSOPENREQ, &VoidFs::execFSOPENREQ); + addRecSignal(GSN_FSCLOSEREQ, &VoidFs::execFSCLOSEREQ); + addRecSignal(GSN_FSWRITEREQ, &VoidFs::execFSWRITEREQ); + addRecSignal(GSN_FSREADREQ, &VoidFs::execFSREADREQ); + addRecSignal(GSN_FSSYNCREQ, &VoidFs::execFSSYNCREQ); + addRecSignal(GSN_FSAPPENDREQ, &VoidFs::execFSAPPENDREQ); + addRecSignal(GSN_FSREMOVEREQ, &VoidFs::execFSREMOVEREQ); + // Set send signals +} + +VoidFs::~VoidFs() +{ +} + +void +VoidFs::execSTTOR(Signal* signal) +{ + jamEntry(); + + if(signal->theData[1] == 0){ // StartPhase 0 + jam(); + signal->theData[3] = 255; + sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 4, JBB); + return; + } + ndbrequire(0); +} + +void +VoidFs::execFSOPENREQ(Signal* signal) +{ + jamEntry(); + const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0]; + const BlockReference userRef = fsOpenReq->userReference; + const Uint32 userPointer = fsOpenReq->userPointer; + + Uint32 flags = fsOpenReq->fileFlags; + if(flags == FsOpenReq::OM_READONLY){ + // Initialise FsRef signal + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->errorCode = FsRef::fsErrFileDoesNotExist; + fsRef->osErrorCode = ~0; + sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB); + return; + } + + if(flags & FsOpenReq::OM_WRITEONLY || flags & FsOpenReq::OM_READWRITE){ + signal->theData[0] = userPointer; + signal->theData[1] = c_maxFileNo++; + sendSignal(userRef, GSN_FSOPENCONF, signal, 2, JBB); + } +} + +void +VoidFs::execFSREMOVEREQ(Signal* signal) +{ + jamEntry(); + const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr(); + const Uint32 userRef = req->userReference; + const Uint32 userPointer = req->userPointer; + + signal->theData[0] = userPointer; + sendSignal(userRef, GSN_FSREMOVECONF, signal, 1, JBB); +} + +/* + * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1 + * remove file + */ +void +VoidFs::execFSCLOSEREQ(Signal * signal) +{ + jamEntry(); + + const FsCloseReq * const req = (FsCloseReq *)signal->getDataPtr(); + const Uint32 userRef = req->userReference; + const Uint32 userPointer = req->userPointer; + + signal->theData[0] = userPointer; + sendSignal(userRef, GSN_FSCLOSECONF, signal, 1, JBB); +} + +void +VoidFs::execFSWRITEREQ(Signal* signal) +{ + jamEntry(); + const FsReadWriteReq * const req = (FsReadWriteReq *)signal->getDataPtr(); + const Uint32 userRef = req->userReference; + const Uint32 userPointer = req->userPointer; + + signal->theData[0] = userPointer; + sendSignal(userRef, GSN_FSWRITECONF, signal, 1, JBB); +} + +void +VoidFs::execFSREADREQ(Signal* signal) +{ + jamEntry(); + + const FsReadWriteReq * const req = (FsReadWriteReq *)signal->getDataPtr(); + const Uint32 userRef = req->userReference; + const Uint32 userPointer = req->userPointer; + + signal->theData[0] = userPointer; + sendSignal(userRef, GSN_FSREADCONF, signal, 1, JBB); +#if 0 + FsRef * const fsRef = (FsRef *)&signal->theData[0]; + fsRef->userPointer = userPointer; + fsRef->errorCode = FsRef::fsErrEnvironmentError; + fsRef->osErrorCode = ~0; // Indicate local error + sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB); +#endif +} + +void +VoidFs::execFSSYNCREQ(Signal * signal) +{ + jamEntry(); + + BlockReference userRef = signal->theData[1]; + const UintR userPointer = signal->theData[2]; + + signal->theData[0] = userPointer; + sendSignal(userRef, GSN_FSSYNCCONF, signal, 1, JBB); + + return; +} + +void +VoidFs::execFSAPPENDREQ(Signal * signal) +{ + const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0]; + const UintR userPointer = fsReq->userPointer; + const BlockReference userRef = fsReq->userReference; + const Uint32 size = fsReq->size; + + signal->theData[0] = userPointer; + signal->theData[1] = size << 2; + sendSignal(userRef, GSN_FSAPPENDCONF, signal, 2, JBB); +} + +void +VoidFs::execDUMP_STATE_ORD(Signal* signal) +{ +}//VoidFs::execDUMP_STATE_ORD() + + + +BLOCK_FUNCTIONS(VoidFs); + |