summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/ndbfs
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/ndbfs')
-rw-r--r--ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp1050
-rw-r--r--ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp234
-rw-r--r--ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp697
-rw-r--r--ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile27
-rw-r--r--ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp20
-rw-r--r--ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp116
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Filename.cpp220
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Filename.hpp97
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Makefile14
-rw-r--r--ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp18
-rw-r--r--ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp168
-rw-r--r--ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp205
-rw-r--r--ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile13
-rw-r--r--ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp197
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp1008
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp126
-rw-r--r--ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp114
-rw-r--r--ndb/src/kernel/blocks/ndbfs/Pool.hpp262
-rw-r--r--ndb/src/kernel/blocks/ndbfs/VoidFs.cpp200
19 files changed, 4786 insertions, 0 deletions
diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp b/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
new file mode 100644
index 00000000000..0e2aa4c6903
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
@@ -0,0 +1,1050 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/**
+ * O_DIRECT
+ */
+#ifdef NDB_LINUX
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#endif
+
+#include "Error.hpp"
+#include "AsyncFile.hpp"
+
+#include <ErrorHandlingMacros.hpp>
+#include <kernel_types.h>
+#include <string.h>
+#include <NdbMem.h>
+#include <NdbThread.h>
+#include <signaldata/FsOpenReq.hpp>
+
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#ifdef NDB_LINUX
+// This is for pread and pwrite
+#ifndef __USE_UNIX98
+#define __USE_UNIX98
+#endif
+#endif
+
+#include <NdbUnistd.h>
+#if defined NDB_WIN32 || defined NDB_OSE || defined NDB_SOFTOSE
+#include <NdbStdio.h>
+#else
+// For readv and writev
+#include <sys/uio.h>
+#endif
+
+#ifndef NDB_WIN32
+#include <dirent.h>
+#endif
+
+// Use this define if you want printouts from AsyncFile class
+//#define DEBUG_ASYNCFILE
+
+#ifdef DEBUG_ASYNCFILE
+#include <NdbOut.hpp>
+#define DEBUG(x) x
+#define PRINT_ERRORANDFLAGS(f) printErrorAndFlags(f)
+void printErrorAndFlags(Uint32 used_flags);
+#else
+#define DEBUG(x)
+#define PRINT_ERRORANDFLAGS(f)
+#endif
+
+// Define the size of the write buffer (for each thread)
+#if defined NDB_SOFTOSE || defined NDB_OSE
+#define WRITEBUFFERSIZE 65536
+#else
+#define WRITEBUFFERSIZE 262144
+#endif
+
+const char *actionName[] = {
+ "open",
+ "close",
+ "closeRemove",
+ "read",
+ "readv",
+ "write",
+ "writev",
+ "writeSync",
+ "writevSync",
+ "sync",
+ "end" };
+
+static int numAsyncFiles = 0;
+
+extern "C" void * runAsyncFile(void* arg)
+{
+ ((AsyncFile*)arg)->run();
+ return (NULL);
+}
+
+AsyncFile::AsyncFile() :
+ theFileName(),
+#ifdef NDB_WIN32
+ hFile(INVALID_HANDLE_VALUE),
+#else
+ theFd(-1),
+#endif
+ theReportTo(0),
+ theMemoryChannelPtr(NULL)
+{
+}
+
+void
+AsyncFile::doStart(const char * filesystemPath) {
+ theFileName.init(filesystemPath);
+
+ // Stacksize for filesystem threads
+ // An 8k stack should be enough
+ const NDB_THREAD_STACKSIZE stackSize = 8192;
+
+ char buf[16];
+ numAsyncFiles++;
+ snprintf(buf, sizeof(buf), "AsyncFile%d", numAsyncFiles);
+
+ theStartMutexPtr = NdbMutex_Create();
+ theStartConditionPtr = NdbCondition_Create();
+ NdbMutex_Lock(theStartMutexPtr);
+ theStartFlag = false;
+ theThreadPtr = NdbThread_Create(runAsyncFile,
+ (void**)this,
+ stackSize,
+ (char*)&buf,
+ NDB_THREAD_PRIO_MEAN);
+
+ NdbCondition_Wait(theStartConditionPtr,
+ theStartMutexPtr);
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbMutex_Destroy(theStartMutexPtr);
+ NdbCondition_Destroy(theStartConditionPtr);
+}
+
+AsyncFile::~AsyncFile()
+{
+ void *status;
+ Request request;
+ request.action = Request::end;
+ theMemoryChannelPtr->writeChannel( &request );
+ NdbThread_WaitFor(theThreadPtr, &status);
+ NdbThread_Destroy(&theThreadPtr);
+ delete theMemoryChannelPtr;
+}
+
+void
+AsyncFile::reportTo( MemoryChannel<Request> *reportTo )
+{
+ theReportTo = reportTo;
+}
+
+void AsyncFile::execute(Request* request)
+{
+ theMemoryChannelPtr->writeChannel( request );
+}
+
+void
+AsyncFile::run()
+{
+ Request *request;
+ // Create theMemoryChannel in the thread that will wait for it
+ NdbMutex_Lock(theStartMutexPtr);
+ theMemoryChannelPtr = new MemoryChannel<Request>();
+ theStartFlag = true;
+ // Create write buffer for bigger writes
+ theWriteBufferSize = WRITEBUFFERSIZE;
+ theWriteBuffer = (char *) NdbMem_Allocate(theWriteBufferSize);
+ NdbMutex_Unlock(theStartMutexPtr);
+ NdbCondition_Signal(theStartConditionPtr);
+
+ if (!theWriteBuffer) {
+ DEBUG(ndbout_c("AsyncFile::writeReq, Failed allocating write buffer"));
+ return;
+ }//if
+
+ while (1) {
+ request = theMemoryChannelPtr->readChannel();
+ if (!request) {
+ DEBUG(ndbout_c("Nothing read from Memory Channel in AsyncFile"));
+ endReq();
+ return;
+ }//if
+ switch (request->action) {
+ case Request:: open:
+ openReq(request);
+ break;
+ case Request:: close:
+ closeReq(request);
+ break;
+ case Request:: closeRemove:
+ closeReq(request);
+ removeReq(request);
+ break;
+ case Request:: read:
+ readReq(request);
+ break;
+ case Request:: readv:
+ readvReq(request);
+ break;
+ case Request:: write:
+ writeReq(request);
+ break;
+ case Request:: writev:
+ writevReq(request);
+ break;
+ case Request:: writeSync:
+ writeReq(request);
+ syncReq(request);
+ break;
+ case Request:: writevSync:
+ writevReq(request);
+ syncReq(request);
+ break;
+ case Request:: sync:
+ syncReq(request);
+ break;
+ case Request:: append:
+ appendReq(request);
+ break;
+ case Request::rmrf:
+ rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
+ break;
+ case Request:: end:
+ closeReq(request);
+ endReq();
+ return;
+ default:
+ THREAD_REQUIRE(false, "Using default switch in AsyncFile::run");
+ break;
+ }//switch
+ theReportTo->writeChannel(request);
+ }//while
+}//AsyncFile::run()
+
+extern bool Global_useO_SYNC;
+extern bool Global_useO_DIRECT;
+extern bool Global_unlinkO_CREAT;
+extern Uint32 Global_syncFreq;
+
+void AsyncFile::openReq(Request* request)
+{
+ m_openedWithSync = false;
+ m_syncFrequency = 0;
+
+ // for open.flags, see signal FSOPENREQ
+#ifdef NDB_WIN32
+ DWORD dwCreationDisposition;
+ DWORD dwDesiredAccess = 0;
+ DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
+ DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS | FILE_FLAG_NO_BUFFERING;
+ const Uint32 flags = request->par.open.flags;
+
+ // Convert file open flags from Solaris to Windows
+ if ((flags & FsOpenReq::OM_CREATE) && (flags & FsOpenReq::OM_TRUNCATE)){
+ dwCreationDisposition = CREATE_ALWAYS;
+ } else if (flags & FsOpenReq::OM_TRUNCATE){
+ dwCreationDisposition = TRUNCATE_EXISTING;
+ } else if (flags & FsOpenReq::OM_CREATE){
+ dwCreationDisposition = CREATE_NEW;
+ } else {
+ dwCreationDisposition = OPEN_EXISTING;
+ }
+
+ switch(flags & 3){
+ case FsOpenReq::OM_READONLY:
+ dwDesiredAccess = GENERIC_READ;
+ break;
+ case FsOpenReq::OM_WRITEONLY:
+ dwDesiredAccess = GENERIC_WRITE;
+ break;
+ case FsOpenReq::OM_READWRITE:
+ dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
+ break;
+ default:
+ request->error = 1000;
+ break;
+ return;
+ }
+
+ hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
+ 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
+
+ if(INVALID_HANDLE_VALUE == hFile) {
+ request->error = GetLastError();
+ if(((ERROR_PATH_NOT_FOUND == request->error) || (ERROR_INVALID_NAME == request->error))
+ && (flags & FsOpenReq::OM_CREATE)) {
+ createDirectories();
+ hFile = CreateFile(theFileName.c_str(), dwDesiredAccess, dwShareMode,
+ 0, dwCreationDisposition, dwFlagsAndAttributes, 0);
+
+ if(INVALID_HANDLE_VALUE == hFile)
+ request->error = GetLastError();
+ else
+ request->error = 0;
+
+ return;
+ }
+ }
+ else {
+ request->error = 0;
+ return;
+ }
+#else
+ const Uint32 flags = request->par.open.flags;
+ Uint32 new_flags = 0;
+
+ // Convert file open flags from Solaris to Liux
+ if(flags & FsOpenReq::OM_CREATE){
+ new_flags |= O_CREAT;
+ }
+
+ if(flags & FsOpenReq::OM_TRUNCATE){
+#if 0
+ if(Global_unlinkO_CREAT){
+ unlink(theFileName.c_str());
+ } else
+#endif
+ new_flags |= O_TRUNC;
+ }
+
+ if(flags & FsOpenReq::OM_APPEND){
+ new_flags |= O_APPEND;
+ }
+
+ if(flags & FsOpenReq::OM_SYNC){
+#if 0
+ if(Global_useO_SYNC){
+ new_flags |= O_SYNC;
+ m_openedWithSync = true;
+ m_syncFrequency = 0;
+ } else {
+#endif
+ m_openedWithSync = false;
+ m_syncCount = 0;
+ m_syncFrequency = Global_syncFreq;
+#if 0
+ }
+#endif
+ } else {
+ m_openedWithSync = false;
+ m_syncFrequency = 0;
+ }
+
+#if 0
+#if NDB_LINUX
+ if(Global_useO_DIRECT){
+ new_flags |= O_DIRECT;
+ }
+#endif
+#endif
+
+ switch(flags & 0x3){
+ case FsOpenReq::OM_READONLY:
+ new_flags |= O_RDONLY;
+ break;
+ case FsOpenReq::OM_WRITEONLY:
+ new_flags |= O_WRONLY;
+ break;
+ case FsOpenReq::OM_READWRITE:
+ new_flags |= O_RDWR;
+ break;
+ default:
+ request->error = 1000;
+ break;
+ return;
+ }
+ const int mode = S_IRUSR | S_IWUSR | S_IRGRP;
+
+ if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
+ PRINT_ERRORANDFLAGS(new_flags);
+ if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) {
+ createDirectories();
+ if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) {
+ PRINT_ERRORANDFLAGS(new_flags);
+ request->error = errno;
+ }
+ } else {
+ request->error = errno;
+ }
+ }
+#endif
+}
+
+int
+AsyncFile::readBuffer(char * buf, size_t size, off_t offset){
+ int return_value;
+
+#ifdef NDB_WIN32
+ DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
+ if(dwSFP != offset) {
+ return GetLastError();
+ }
+#elif defined NDB_OSE || defined NDB_SOFTOSE
+ return_value = lseek(theFd, offset, SEEK_SET);
+ if (return_value != offset) {
+ return errno;
+ }
+#endif
+
+ while (size > 0) {
+ size_t bytes_read = 0;
+
+#ifdef NDB_WIN32
+ DWORD dwBytesRead;
+ BOOL bRead = ReadFile(hFile,
+ buf,
+ size,
+ &dwBytesRead,
+ 0);
+ if(!bRead){
+ return GetLastError();
+ }
+ bytes_read = dwBytesRead;
+#elif defined NDB_OSE || defined NDB_SOFTOSE
+ return_value = ::read(theFd, buf, size);
+#else // UNIX
+ return_value = ::pread(theFd, buf, size, offset);
+#endif
+#ifndef NDB_WIN32
+ if (return_value == -1 && errno == EINTR) {
+ DEBUG(ndbout_c("EINTR in read"));
+ continue;
+ } else if (return_value == -1){
+ return errno;
+ } else {
+ bytes_read = return_value;
+ }
+#endif
+
+ if(bytes_read == 0){
+ DEBUG(ndbout_c("Read underflow %d %d\n %x\n%d %d",
+ size, offset, buf, bytes_read, return_value));
+ return ERR_ReadUnderflow;
+ }
+
+ if(bytes_read != size){
+ DEBUG(ndbout_c("Warning partial read %d != %d",
+ bytes_read, size));
+ }
+
+ buf += bytes_read;
+ size -= bytes_read;
+ offset += bytes_read;
+ }
+ return 0;
+}
+
+void
+AsyncFile::readReq( Request * request)
+{
+ for(int i = 0; i < request->par.readWrite.numberOfPages ; i++) {
+ off_t offset = request->par.readWrite.pages[i].offset;
+ size_t size = request->par.readWrite.pages[i].size;
+ char * buf = request->par.readWrite.pages[i].buf;
+
+ int err = readBuffer(buf, size, offset);
+ if(err != 0){
+ request->error = err;
+ return;
+ }
+ }
+}
+
+void
+AsyncFile::readvReq( Request * request)
+{
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ readReq(request);
+ return;
+#elif defined NDB_WIN32
+ // ReadFileScatter?
+ readReq(request);
+ return;
+#else
+ int return_value;
+ int length = 0;
+ struct iovec iov[20]; // the parameter in the signal restricts this to 20 deep
+ for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
+ iov[i].iov_base= request->par.readWrite.pages[i].buf;
+ iov[i].iov_len= request->par.readWrite.pages[i].size;
+ length = length + iov[i].iov_len;
+ }
+ lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
+ return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
+ if (return_value == -1) {
+ request->error = errno;
+ return;
+ } else if (return_value != length) {
+ request->error = 1011;
+ return;
+ }
+#endif
+}
+
+int
+AsyncFile::extendfile(Request* request) {
+#if defined NDB_OSE || defined NDB_SOFTOSE
+ // Find max size of this file in this request
+ int maxOffset = 0;
+ int maxSize = 0;
+ for(int i=0; i < request->par.readWrite.numberOfPages ; i++) {
+ if (request->par.readWrite.pages[i].offset > maxOffset) {
+ maxOffset = request->par.readWrite.pages[i].offset;
+ maxSize = request->par.readWrite.pages[i].size;
+ }
+ }
+ DEBUG(ndbout_c("extendfile: maxOffset=%d, size=%d", maxOffset, maxSize));
+
+ // Allocate a buffer and fill it with zeros
+ void* pbuf = malloc(maxSize);
+ memset(pbuf, 0, maxSize);
+ for (int p = 0; p <= maxOffset; p = p + maxSize) {
+ int return_value;
+ return_value = lseek(theFd,
+ p,
+ SEEK_SET);
+ if((return_value == -1 ) || (return_value != p)) {
+ return -1;
+ }
+ return_value = ::write(theFd,
+ pbuf,
+ maxSize);
+ if ((return_value == -1) || (return_value != maxSize)) {
+ return -1;
+ }
+ }
+ free(pbuf);
+
+ DEBUG(ndbout_c("extendfile: \"%s\" OK!", theFileName.c_str()));
+ return 0;
+#else
+ request = request;
+ abort();
+ return -1;
+#endif
+}
+
+void
+AsyncFile::writeReq( Request * request)
+{
+ int page_num = 0;
+ bool write_not_complete = true;
+
+ while(write_not_complete) {
+ int totsize = 0;
+ off_t offset = request->par.readWrite.pages[page_num].offset;
+ char* bufptr = theWriteBuffer;
+
+ write_not_complete = false;
+ if (request->par.readWrite.numberOfPages > 1) {
+ off_t page_offset = offset;
+
+ // Multiple page write, copy to buffer for one write
+ for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
+ memcpy(bufptr,
+ request->par.readWrite.pages[i].buf,
+ request->par.readWrite.pages[i].size);
+ bufptr += request->par.readWrite.pages[i].size;
+ totsize += request->par.readWrite.pages[i].size;
+ if (((i + 1) < request->par.readWrite.numberOfPages)) {
+ // There are more pages to write
+ // Check that offsets are consequtive
+ if ((page_offset + request->par.readWrite.pages[i].size)
+ !=
+ request->par.readWrite.pages[i+1].offset) {
+ // Next page is not aligned with previous, not allowed
+ DEBUG(ndbout_c("Page offsets are not aligned"));
+ request->error = EINVAL;
+ return;
+ }
+ if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
+ // We are not finished and the buffer is full
+ write_not_complete = true;
+ // Start again with next page
+ page_num = i + 1;
+ break;
+ }
+ }
+ page_offset += request->par.readWrite.pages[i].size;
+ }
+ bufptr = theWriteBuffer;
+ } else {
+ // One page write, write page directly
+ bufptr = request->par.readWrite.pages[0].buf;
+ totsize = request->par.readWrite.pages[0].size;
+ }
+ int err = writeBuffer(bufptr, totsize, offset);
+ if(err != 0){
+ request->error = err;
+ return;
+ }
+ } // while(write_not_complete)
+}
+
+int
+AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size)
+{
+ size_t bytes_to_write = chunk_size;
+ int return_value;
+
+#ifdef NDB_WIN32
+ DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
+ if(dwSFP != offset) {
+ return GetLastError();
+ }
+#elif defined NDB_OSE || defined NDB_SOFTOSE
+ return_value = lseek(theFd, offset, SEEK_SET);
+ if (return_value != offset) {
+ DEBUG(ndbout_c("AsyncFile::writeReq, err1: return_value=%d, offset=%d\n",
+ return_value, chunk_offset));
+ PRINT_ERRORANDFLAGS(0);
+ if (errno == 78) {
+ // Could not write beyond end of file, try to extend file
+ DEBUG(ndbout_c("AsyncFile::writeReq, Extend. file! filename=\"%s\" \n",
+ theFileName.c_str()));
+ return_value = extendfile(request);
+ if (return_value == -1) {
+ return errno;
+ }
+ return_value = lseek(theFd, offset, SEEK_SET);
+ if (return_value != offset) {
+ return errno;
+ }
+ } else {
+ return errno;
+ }
+ }
+#endif
+
+ while (size > 0) {
+ if (size < bytes_to_write){
+ // We are at the last chunk
+ bytes_to_write = size;
+ }
+ size_t bytes_written = 0;
+
+#ifdef NDB_WIN32
+ DWORD dwWritten;
+ BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
+ if(!bWrite) {
+ return GetLastError();
+ }
+ bytes_written = dwWritten;
+ if (bytes_written != bytes_to_write) {
+ DEBUG(ndbout_c("Warning partial write %d != %d", bytes_written, bytes_to_write));
+ }
+
+#elif defined NDB_OSE || defined NDB_SOFTOSE
+ return_value = ::write(theFd, buf, bytes_to_write);
+#else // UNIX
+ return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
+#endif
+#ifndef NDB_WIN32
+ if (return_value == -1 && errno == EINTR) {
+ bytes_written = 0;
+ DEBUG(ndbout_c("EINTR in write"));
+ } else if (return_value == -1){
+ return errno;
+ } else {
+ bytes_written = return_value;
+
+ if(bytes_written == 0){
+ abort();
+ }
+
+ if(bytes_written != bytes_to_write){
+ DEBUG(ndbout_c("Warning partial write %d != %d",
+ bytes_written, bytes_to_write));
+ }
+ }
+#endif
+
+ buf += bytes_written;
+ size -= bytes_written;
+ offset += bytes_written;
+ }
+ return 0;
+}
+
+void
+AsyncFile::writevReq( Request * request)
+{
+ // WriteFileGather on WIN32?
+ writeReq(request);
+}
+
+
+void
+AsyncFile::closeReq(Request * request)
+{
+ syncReq(request);
+#ifdef NDB_WIN32
+ if(!CloseHandle(hFile)) {
+ request->error = GetLastError();
+ }
+ hFile = INVALID_HANDLE_VALUE;
+#else
+ if (-1 == ::close(theFd)) {
+ request->error = errno;
+ }
+ theFd = -1;
+#endif
+}
+
+bool AsyncFile::isOpen(){
+#ifdef NDB_WIN32
+ return (hFile != INVALID_HANDLE_VALUE);
+#else
+ return (theFd != -1);
+#endif
+}
+
+
+void
+AsyncFile::syncReq(Request * request)
+{
+ if(m_openedWithSync){
+ return;
+ }
+#ifdef NDB_WIN32
+ if(!FlushFileBuffers(hFile)) {
+ request->error = GetLastError();
+ return;
+ }
+#else
+ if (-1 == ::fsync(theFd)){
+ request->error = errno;
+ return;
+ }
+#endif
+ m_syncCount = 0;
+}
+
+void
+AsyncFile::appendReq(Request * request){
+
+ const char * buf = request->par.append.buf;
+ Uint32 size = request->par.append.size;
+
+ m_syncCount += size;
+
+#ifdef NDB_WIN32
+ DWORD dwWritten = 0;
+ while(size > 0){
+ if(!WriteFile(hFile, buf, size, &dwWritten, 0)){
+ request->error = GetLastError();
+ return ;
+ }
+
+ buf += dwWritten;
+ size -= dwWritten;
+ }
+#else
+ while(size > 0){
+ const int n = write(theFd, buf, size);
+ if(n == -1 && errno == EINTR){
+ continue;
+ }
+ if(n == -1){
+ request->error = errno;
+ return;
+ }
+ if(n == 0){
+ abort();
+ }
+ size -= n;
+ buf += n;
+ }
+#endif
+
+ if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){
+ syncReq(request);
+ request->error = 0;
+ }
+}
+
+void
+AsyncFile::removeReq(Request * request)
+{
+#ifdef NDB_WIN32
+ if(!DeleteFile(theFileName.c_str())) {
+ request->error = GetLastError();
+ }
+#else
+ if (-1 == ::remove(theFileName.c_str())) {
+ request->error = errno;
+
+ }
+#endif
+}
+
+void
+AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
+ Uint32 path_len = strlen(path);
+ Uint32 path_max_copy = PATH_MAX - path_len;
+ char* path_add = &path[path_len];
+#ifndef NDB_WIN32
+ if(!request->par.rmrf.directory){
+ // Remove file
+ if(unlink((const char *)path) != 0 && errno != ENOENT)
+ request->error = errno;
+ return;
+ }
+ // Remove directory
+ DIR* dirp = opendir((const char *)path);
+ if(dirp == 0){
+ if(errno != ENOENT)
+ request->error = errno;
+ return;
+ }
+ struct dirent * dp;
+ while ((dp = readdir(dirp)) != NULL){
+ if ((strcmp(".", dp->d_name) != 0) && (strcmp("..", dp->d_name) != 0)) {
+ snprintf(path_add, (size_t)path_max_copy, "%s%s",
+ DIR_SEPARATOR, dp->d_name);
+ if(remove((const char*)path) == 0){
+ path[path_len] = 0;
+ continue;
+ }
+
+ rmrfReq(request, path, true);
+ path[path_len] = 0;
+ if(request->error != 0){
+ closedir(dirp);
+ return;
+ }
+ }
+ }
+ closedir(dirp);
+ if(removePath && rmdir((const char *)path) != 0){
+ request->error = errno;
+ }
+ return;
+#else
+
+ if(!request->par.rmrf.directory){
+ // Remove file
+ if(!DeleteFile(path)){
+ DWORD dwError = GetLastError();
+ if(dwError!=ERROR_FILE_NOT_FOUND)
+ request->error = dwError;
+ }
+ return;
+ }
+
+ strcat(path, "\\*");
+ WIN32_FIND_DATA ffd;
+ HANDLE hFindFile = FindFirstFile(path, &ffd);
+ path[path_len] = 0;
+ if(INVALID_HANDLE_VALUE==hFindFile){
+ DWORD dwError = GetLastError();
+ if(dwError!=ERROR_PATH_NOT_FOUND)
+ request->error = dwError;
+ return;
+ }
+
+ do {
+ if(0!=strcmp(".", ffd.cFileName) && 0!=strcmp("..", ffd.cFileName)){
+ strcat(path, "\\");
+ strcat(path, ffd.cFileName);
+ if(DeleteFile(path)) {
+ path[path_len] = 0;
+ continue;
+ }//if
+
+ rmrfReq(request, path, true);
+ path[path_len] = 0;
+ if(request->error != 0){
+ FindClose(hFindFile);
+ return;
+ }
+ }
+ } while(FindNextFile(hFindFile, &ffd));
+
+ FindClose(hFindFile);
+
+ if(removePath && !RemoveDirectory(path))
+ request->error = GetLastError();
+
+#endif
+}
+
+void AsyncFile::endReq()
+{
+ // Thread is ended with return
+ if (theWriteBuffer) NdbMem_Free(theWriteBuffer);
+ NdbThread_Exit(0);
+}
+
+
+void AsyncFile::createDirectories()
+{
+ for (int i = 0; i < theFileName.levels(); i++) {
+#ifdef NDB_WIN32
+ CreateDirectory(theFileName.directory(i), 0);
+#else
+ //printf("AsyncFile::createDirectories : \"%s\"\n", theFileName.directory(i));
+ mkdir(theFileName.directory(i), S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
+#endif
+ }
+}
+
+#ifdef DEBUG_ASYNCFILE
+void printErrorAndFlags(Uint32 used_flags) {
+ char buf[255];
+ sprintf(buf, "PEAF: errno=%d \"", errno);
+
+ switch(errno) {
+ case EACCES:
+ strcat(buf, "EACCES");
+ break;
+ case EDQUOT:
+ strcat(buf, "EDQUOT");
+ break;
+ case EEXIST :
+ strcat(buf, "EEXIST");
+ break;
+ case EINTR :
+ strcat(buf, "EINTR");
+ break;
+ case EFAULT :
+ strcat(buf, "EFAULT");
+ break;
+ case EIO :
+ strcat(buf, "EIO");
+ break;
+ case EISDIR :
+ strcat(buf, "EISDIR");
+ break;
+ case ELOOP :
+ strcat(buf, "ELOOP");
+ break;
+ case EMFILE :
+ strcat(buf, "EMFILE");
+ break;
+ case ENFILE :
+ strcat(buf, "ENFILE");
+ break;
+ case ENOENT :
+ strcat(buf, "ENOENT ");
+ break;
+ case ENOSPC :
+ strcat(buf, "ENOSPC");
+ break;
+ case ENOTDIR :
+ strcat(buf, "ENOTDIR");
+ break;
+ case ENXIO :
+ strcat(buf, "ENXIO");
+ break;
+ case EOPNOTSUPP:
+ strcat(buf, "EOPNOTSUPP");
+ break;
+#if !defined NDB_OSE && !defined NDB_SOFTOSE
+ case EMULTIHOP :
+ strcat(buf, "EMULTIHOP");
+ break;
+ case ENOLINK :
+ strcat(buf, "ENOLINK");
+ break;
+ case ENOSR :
+ strcat(buf, "ENOSR");
+ break;
+ case EOVERFLOW :
+ strcat(buf, "EOVERFLOW");
+ break;
+#endif
+ case EROFS :
+ strcat(buf, "EROFS");
+ break;
+ case EAGAIN :
+ strcat(buf, "EAGAIN");
+ break;
+ case EINVAL :
+ strcat(buf, "EINVAL");
+ break;
+ case ENOMEM :
+ strcat(buf, "ENOMEM");
+ break;
+ case ETXTBSY :
+ strcat(buf, "ETXTBSY");
+ break;
+ case ENAMETOOLONG:
+ strcat(buf, "ENAMETOOLONG");
+ break;
+ case EBADF:
+ strcat(buf, "EBADF");
+ break;
+ case ESPIPE:
+ strcat(buf, "ESPIPE");
+ break;
+ case ESTALE:
+ strcat(buf, "ESTALE");
+ break;
+ default:
+ strcat(buf, "EOTHER");
+ break;
+ }
+ strcat(buf, "\" ");
+#if defined NDB_OSE
+ strcat(buf, strerror(errno) << " ");
+#endif
+ strcat(buf, " flags: ");
+ switch(used_flags & 3){
+ case O_RDONLY:
+ strcat(buf, "O_RDONLY, ");
+ break;
+ case O_WRONLY:
+ strcat(buf, "O_WRONLY, ");
+ break;
+ case O_RDWR:
+ strcat(buf, "O_RDWR, ");
+ break;
+ default:
+ strcat(buf, "Unknown!!, ");
+ }
+
+ if((used_flags & O_APPEND)==O_APPEND)
+ strcat(buf, "O_APPEND, ");
+ if((used_flags & O_CREAT)==O_CREAT)
+ strcat(buf, "O_CREAT, ");
+ if((used_flags & O_EXCL)==O_EXCL)
+ strcat(buf, "O_EXCL, ");
+ if((used_flags & O_NOCTTY) == O_NOCTTY)
+ strcat(buf, "O_NOCTTY, ");
+ if((used_flags & O_NONBLOCK)==O_NONBLOCK)
+ strcat(buf, "O_NONBLOCK, ");
+ if((used_flags & O_TRUNC)==O_TRUNC)
+ strcat(buf, "O_TRUNC, ");
+#if !defined NDB_OSE && !defined NDB_SOFTOSE
+ if((used_flags & O_DSYNC)==O_DSYNC)
+ strcat(buf, "O_DSYNC, ");
+ if((used_flags & O_NDELAY)==O_NDELAY)
+ strcat(buf, "O_NDELAY, ");
+ if((used_flags & O_RSYNC)==O_RSYNC)
+ strcat(buf, "O_RSYNC, ");
+ if((used_flags & O_SYNC)==O_SYNC)
+ strcat(buf, "O_SYNC, ");
+ DEBUG(ndbout_c(buf));
+#endif
+
+}
+#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp b/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
new file mode 100644
index 00000000000..caa03e52d0c
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
@@ -0,0 +1,234 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef AsyncFile_H
+#define AsyncFile_H
+
+//===========================================================================
+//
+// .DESCRIPTION
+// Asynchronous file, All actions are executed concurrently with other
+// activity of the process.
+// Because all action are performed in a seperated thread the result of
+// of a action is send back tru a memory channel.
+// For the asyncronise notivication of a finished request all the calls
+// have a request as paramater, the user can use the userData pointer
+// to add information it needs when the request is send back.
+//
+//
+// .TYPICAL USE:
+// Writing or reading data to/from disk concurrently to other activities.
+//
+//===========================================================================
+//=============================================================================
+//
+// .PUBLIC
+//
+//=============================================================================
+///////////////////////////////////////////////////////////////////////////////
+//
+// AsyncFile( );
+// Description:
+// Initialisation of the class.
+// Parameters:
+// -
+///////////////////////////////////////////////////////////////////////////////
+//
+// ~AsyncFile( );
+// Description:
+// Tell the thread to stop and wait for it to return
+// Parameters:
+// -
+///////////////////////////////////////////////////////////////////////////////
+//
+// doStart( );
+// Description:
+// Spawns the new thread.
+// Parameters:
+// Base path of filesystem
+//
+///////////////////////////////////////////////////////////////////////////////
+//
+// void execute(Request *request);
+// Description:
+// performens the requered action.
+// Parameters:
+// request: request to be called when open is finished.
+// action= open|close|read|write|sync
+// if action is open then:
+// par.open.flags= UNIX open flags, see man open
+// par.open.name= name of the file to open
+// if action is read or write then:
+// par.readWrite.buf= user provided buffer to read/write
+// the data from/to
+// par.readWrite.size= how many bytes must be read/written
+// par.readWrite.offset= absolute offset in file in bytes
+// return:
+// return values are stored in the request error field:
+// error= return state of the action, UNIX error see man open/errno
+// userData= is untouched can be used be user.
+//
+///////////////////////////////////////////////////////////////////////////////
+//
+// void reportTo( MemoryChannel<Request> *reportTo );
+// Description:
+// set the channel where the file must report the result of the
+// actions back to.
+// Parameters:
+// reportTo: the memory channel to use use MemoryChannelMultipleWriter
+// if more
+// than one file uses this channel to report back.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include <kernel_types.h>
+#include "MemoryChannel.hpp"
+#include "Filename.hpp"
+
+const int ERR_ReadUnderflow = 1000;
+
+const int WRITECHUNK = 262144;
+
+class AsyncFile;
+
+class Request
+{
+public:
+ enum Action {
+ open,
+ close,
+ closeRemove,
+ read, // Allways leave readv directly after
+ // read because SimblockAsyncFileSystem depends on it
+ readv,
+ write,// Allways leave writev directly after
+ // write because SimblockAsyncFileSystem depends on it
+ writev,
+ writeSync,// Allways leave writevSync directly after
+ // writeSync because SimblockAsyncFileSystem depends on it
+ writevSync,
+ sync,
+ end,
+ append,
+ rmrf
+ };
+ Action action;
+ union {
+ struct {
+ Uint32 flags;
+ } open;
+ struct {
+ int numberOfPages;
+ struct{
+ char *buf;
+ size_t size;
+ off_t offset;
+ } pages[16];
+ } readWrite;
+ struct {
+ const char * buf;
+ size_t size;
+ } append;
+ struct {
+ bool directory;
+ bool own_directory;
+ } rmrf;
+ } par;
+ int error;
+
+ void set(BlockReference userReference,
+ Uint32 userPointer,
+ Uint16 filePointer);
+ BlockReference theUserReference;
+ Uint32 theUserPointer;
+ Uint16 theFilePointer;
+ // Information for open, needed if the first open action fails.
+ AsyncFile* file;
+ Uint32 theTrace;
+};
+
+
+inline
+void
+Request::set(BlockReference userReference,
+ Uint32 userPointer, Uint16 filePointer)
+{
+ theUserReference= userReference;
+ theUserPointer= userPointer;
+ theFilePointer= filePointer;
+}
+
+class AsyncFile
+{
+public:
+ AsyncFile();
+ ~AsyncFile();
+
+ void reportTo( MemoryChannel<Request> *reportTo );
+
+ void execute( Request* request );
+
+ void doStart(const char * fspath);
+ // its a thread so its always running
+ void run();
+
+ bool isOpen();
+
+ Filename theFileName;
+private:
+
+ void openReq(Request *request);
+ void readReq(Request *request);
+ void readvReq(Request *request);
+ void writeReq(Request *request);
+ void writevReq(Request *request);
+
+ void closeReq(Request *request);
+ void syncReq(Request *request);
+ void removeReq(Request *request);
+ void appendReq(Request *request);
+ void rmrfReq(Request *request, char * path, bool removePath);
+ void endReq();
+
+ int readBuffer(char * buf, size_t size, off_t offset);
+ int writeBuffer(const char * buf, size_t size, off_t offset,
+ size_t chunk_size = WRITECHUNK);
+
+ int extendfile(Request* request);
+ void createDirectories();
+
+#ifdef NDB_WIN32
+ HANDLE hFile;
+#else
+ int theFd;
+#endif
+
+ MemoryChannel<Request> *theReportTo;
+ MemoryChannel<Request>* theMemoryChannelPtr;
+
+ struct NdbThread* theThreadPtr;
+ NdbMutex* theStartMutexPtr;
+ NdbCondition* theStartConditionPtr;
+ bool theStartFlag;
+ int theWriteBufferSize;
+ char* theWriteBuffer;
+
+ bool m_openedWithSync;
+ Uint32 m_syncCount;
+ Uint32 m_syncFrequency;
+};
+
+#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp
new file mode 100644
index 00000000000..b9954ba130f
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/AsyncFileTest.cpp
@@ -0,0 +1,697 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+//#define TESTDEBUG 1
+
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <string.h>
+#include <kernel_types.h>
+#include <Pool.hpp>
+#include "AsyncFile.hpp"
+#include "NdbOut.hpp"
+#include "NdbTick.h"
+#include "NdbThread.h"
+#include "NdbMain.h"
+
+// Test and benchmark functionality of AsyncFile
+// -n Number of files
+// -r Number of simultaneous requests
+// -s Filesize, number of pages
+// -l Number of iterations
+// -remove, remove files after close
+// -reverse, write files in reverse order, start with the last page
+
+#define MAXFILES 255
+#define DEFAULT_NUM_FILES 1
+#define MAXREQUESTS 256
+#define DEFAULT_NUM_REQUESTS 1
+#define MAXFILESIZE 4096
+#define DEFAULT_FILESIZE 2048
+#define FVERSION 0x01000000
+#define PAGESIZE 8192
+
+#define TIMER_START { Uint64 starttick = NdbTick_CurrentMillisecond()
+#define TIMER_PRINT(str, ops) Uint64 stoptick = NdbTick_CurrentMillisecond();\
+ Uint64 totaltime = (stoptick-starttick); \
+ ndbout << ops << " " << str << \
+ " total time " << (int)totaltime << "ms" << endl;\
+ char buf[255];\
+ sprintf(buf, "%d %s/sec\n",(int)((ops*1000)/totaltime), str);\
+ ndbout <<buf << endl;}
+
+static int numberOfFiles = DEFAULT_NUM_FILES;
+static int numberOfRequests = DEFAULT_NUM_REQUESTS;
+static int fileSize = DEFAULT_FILESIZE;
+static int removeFiles = 0;
+static int writeFilesReverse = 0;
+static int numberOfIterations = 1;
+Uint32 FileNameArray[4];
+
+Pool<AsyncFile>* files;
+AsyncFile* openFiles[MAXFILES];
+Pool<Request>* theRequestPool;
+MemoryChannelMultipleWriter<Request>* theReportChannel;
+
+char WritePages[MAXFILES][PAGESIZE];
+char ReadPages[MAXFILES][PAGESIZE];
+
+int readArguments(int argc, const char** argv);
+int openFile(int fileNum);
+int openFileWait();
+int closeFile(int fileNum);
+int closeFileWait();
+int writeFile( int fileNum, int pagenum);
+int writeFileWait();
+int writeSyncFile( int fileNum, int pagenum);
+int writeSyncFileWait();
+int readFile( int fileNum, int pagenum);
+int readFileWait();
+
+
+NDB_COMMAND(aftest, "aftest", "aftest [-n <Number of files>] [-r <Number of simultaneous requests>] [-s <Filesize, number of pages>] [-l <Number of iterations>] [-remove, remove files after close] [-reverse, write files in reverse order, start with the last page]", "Test the AsyncFile class of Ndb", 8192)
+{
+ int s, numReq, numOps;
+
+ readArguments(argc, argv);
+
+ files = new Pool<AsyncFile>(numberOfFiles, 2);
+ theRequestPool = new Pool<Request>;
+ theReportChannel = new MemoryChannelMultipleWriter<Request>;
+
+ ndbout << "AsyncFileTest starting" << endl;
+ ndbout << " " << numberOfFiles << " files" << endl;
+ ndbout << " " << numberOfRequests << " requests" << endl;
+ ndbout << " " << fileSize << " * 8k files" << endl << endl;
+ ndbout << " " << numberOfIterations << " iterations" << endl << endl;
+
+ NdbThread_SetConcurrencyLevel(numberOfFiles+2);
+
+ // initialize data to write to files
+ for (int i = 0; i < MAXFILES; i++) {
+ for (int j = 0; j < PAGESIZE; j++){
+ WritePages[i][j] = (64+i+j)%256;
+ }
+ // memset(&WritePages[i][0], i+64, PAGESIZE);
+ }
+
+ // Set file directory and name
+ // /T27/F27/NDBFS/S27Pnn.data
+ FileNameArray[0] = 27; // T27
+ FileNameArray[1] = 27; // F27
+ FileNameArray[2] = 27; // S27
+ FileNameArray[3] = FVERSION; // Version
+
+ for (int l = 0; l < numberOfIterations; l++)
+ {
+
+ ndbout << "Opening files" << endl;
+ // Open files
+ for (int f = 0; f < numberOfFiles; f++)
+ {
+ openFile(f);
+
+ }
+
+ // Wait for answer
+ openFileWait();
+
+ ndbout << "Files opened!" << endl<< endl;
+
+ // Write to files
+ ndbout << "Started writing" << endl;
+ TIMER_START;
+ s = 0;
+ numReq = 0;
+ numOps = 0;
+ while ( s < fileSize)
+ {
+ for (int r = 0; r < numberOfRequests; r++)
+ {
+ for (int f = 0; f < numberOfFiles; f++)
+ {
+ writeFile(f, s);
+ numReq++;
+ numOps++;
+ }
+
+ s++;
+ }
+
+ while (numReq > 0)
+ {
+ writeFileWait();
+ numReq--;
+ }
+
+ }
+
+ TIMER_PRINT("writes", numOps);
+
+
+ ndbout << "Started reading" << endl;
+ TIMER_START;
+
+ // Read from files
+ s = 0;
+ numReq = 0;
+ numOps = 0;
+ while ( s < fileSize)
+ {
+ for (int r = 0; r < numberOfRequests; r++)
+ {
+ for (int f = 0; f < numberOfFiles; f++)
+ {
+ readFile(f, s);
+ numReq++;
+ numOps++;
+ }
+
+ s++;
+
+ }
+
+ while (numReq > 0)
+ {
+ readFileWait();
+ numReq--;
+ }
+
+ }
+ TIMER_PRINT("reads", numOps);
+
+ ndbout << "Started writing with sync" << endl;
+ TIMER_START;
+
+ // Write to files
+ s = 0;
+ numReq = 0;
+ numOps = 0;
+ while ( s < fileSize)
+ {
+ for (int r = 0; r < numberOfRequests; r++)
+ {
+ for (int f = 0; f < numberOfFiles; f++)
+ {
+ writeSyncFile(f, s);
+ numReq++;
+ numOps++;
+ }
+
+ s++;
+ }
+
+ while (numReq > 0)
+ {
+ writeSyncFileWait();
+ numReq--;
+ }
+
+ }
+
+ TIMER_PRINT("writeSync", numOps);
+
+ // Close files
+ ndbout << "Closing files" << endl;
+ for (int f = 0; f < numberOfFiles; f++)
+ {
+ closeFile(f);
+
+ }
+
+ // Wait for answer
+ closeFileWait();
+
+ ndbout << "Files closed!" << endl<< endl;
+ }
+
+ // Deallocate memory
+ delete files;
+ delete theReportChannel;
+ delete theRequestPool;
+
+ return 0;
+
+}
+
+
+
+int forward( AsyncFile * file, Request* request )
+{
+ file->execute(request);
+ ERROR_CHECK 0;
+ return 1;
+}
+
+int openFile( int fileNum)
+{
+ AsyncFile* file = (AsyncFile *)files->get();
+
+ FileNameArray[3] = fileNum | FVERSION;
+ file->fileName().set( NDBFS_REF, &FileNameArray[0] );
+ ndbout << "openFile: " << file->fileName().c_str() << endl;
+
+ if( ERROR_STATE ) {
+ ERROR_RESET;
+ files->put( file );
+ ndbout << "Failed to set filename" << endl;
+ return 1;
+ }
+ file->reportTo(theReportChannel);
+
+ Request* request = theRequestPool->get();
+ request->action= Request::open;
+ request->error= 0;
+ request->par.open.flags = 0x302; //O_RDWR | O_CREAT | O_TRUNC ; // 770
+ request->set(NDBFS_REF, 0x23456789, fileNum );
+ request->file = file;
+
+ if (!forward(file,request)) {
+ // Something went wrong
+ ndbout << "Could not forward open request" << endl;
+ theRequestPool->put(request);
+ return 1;
+ }
+ return 0;
+}
+
+int closeFile( int fileNum)
+{
+
+ AsyncFile* file = openFiles[fileNum];
+
+ Request* request = theRequestPool->get();
+ if (removeFiles == 1)
+ request->action = Request::closeRemove;
+ else
+ request->action= Request::close;
+
+ request->error= 0;
+ request->set(NDBFS_REF, 0x23456789, fileNum );
+ request->file = file;
+
+ if (!forward(file,request)) {
+ // Something went wrong
+ ndbout << "Could not forward close request" << endl;
+ theRequestPool->put(request);
+ return 1;
+ }
+ return 0;
+}
+
+int writeFile( int fileNum, int pagenum)
+{
+ AsyncFile* file = openFiles[fileNum];
+#ifdef TESTDEBUG
+ ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str()<< endl;
+#endif
+ Request *request = theRequestPool->get();
+ request->action = Request::write;
+ request->error = 0;
+ request->set(NDBFS_REF, pagenum, fileNum);
+ request->file = openFiles[fileNum];
+
+ // Write only one page, choose the correct page for each file using fileNum
+ request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
+ request->par.readWrite.pages[0].size = PAGESIZE;
+ if (writeFilesReverse == 1)
+ {
+ // write the last page in the files first
+ // This is a normal way for the Blocks in Ndb to write to a file
+ request->par.readWrite.pages[0].offset = (fileSize - pagenum - 1) * PAGESIZE;
+ }
+ else
+ {
+ request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
+ }
+ request->par.readWrite.numberOfPages = 1;
+
+ if (!forward(file,request)) {
+ // Something went wrong
+ ndbout << "Could not forward write request" << endl;
+ theRequestPool->put(request);
+ return 1;
+ }
+ return 0;
+
+}
+
+int writeSyncFile( int fileNum, int pagenum)
+{
+ AsyncFile* file = openFiles[fileNum];
+#ifdef TESTDEBUG
+ ndbout << "writeFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
+#endif
+ Request *request = theRequestPool->get();
+ request->action = Request::writeSync;
+ request->error = 0;
+ request->set(NDBFS_REF, pagenum, fileNum);
+ request->file = openFiles[fileNum];
+
+ // Write only one page, choose the correct page for each file using fileNum
+ request->par.readWrite.pages[0].buf = &WritePages[fileNum][0];
+ request->par.readWrite.pages[0].size = PAGESIZE;
+ request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
+ request->par.readWrite.numberOfPages = 1;
+
+ if (!forward(file,request)) {
+ // Something went wrong
+ ndbout << "Could not forward write request" << endl;
+ theRequestPool->put(request);
+ return 1;
+ }
+ return 0;
+
+}
+
+int readFile( int fileNum, int pagenum)
+{
+ AsyncFile* file = openFiles[fileNum];
+#ifdef TESTDEBUG
+ ndbout << "readFile" << fileNum <<": "<<pagenum<<", " << file->fileName().c_str() << endl;
+#endif
+ Request *request = theRequestPool->get();
+ request->action = Request::read;
+ request->error = 0;
+ request->set(NDBFS_REF, pagenum, fileNum);
+ request->file = openFiles[fileNum];
+
+ // Read only one page, choose the correct page for each file using fileNum
+ request->par.readWrite.pages[0].buf = &ReadPages[fileNum][0];
+ request->par.readWrite.pages[0].size = PAGESIZE;
+ request->par.readWrite.pages[0].offset = pagenum * PAGESIZE;
+ request->par.readWrite.numberOfPages = 1;
+
+ if (!forward(file,request)) {
+ // Something went wrong
+ ndbout << "Could not forward read request" << endl;
+ theRequestPool->put(request);
+ return 1;
+ }
+ return 0;
+
+}
+
+int openFileWait()
+{
+ int openedFiles = 0;
+ while (openedFiles < numberOfFiles)
+ {
+ Request* request = theReportChannel->readChannel();
+ if (request)
+ {
+ if (request->action == Request::open)
+ {
+ if (request->error ==0)
+ {
+#ifdef TESTDEBUG
+ ndbout << "Opened file " << request->file->fileName().c_str() << endl;
+#endif
+ openFiles[request->theFilePointer] = request->file;
+ }
+ else
+ {
+ ndbout << "error while opening file" << endl;
+ exit(1);
+ }
+ theRequestPool->put(request);
+ openedFiles++;
+ }
+ else
+ {
+ ndbout << "Unexpected request received" << endl;
+ }
+ }
+ else
+ {
+ ndbout << "Nothing read from theReportChannel" << endl;
+ }
+ }
+ return 0;
+}
+
+int closeFileWait()
+{
+ int closedFiles = 0;
+ while (closedFiles < numberOfFiles)
+ {
+ Request* request = theReportChannel->readChannel();
+ if (request)
+ {
+ if (request->action == Request::close || request->action == Request::closeRemove)
+ {
+ if (request->error ==0)
+ {
+#ifdef TESTDEBUG
+ ndbout << "Closed file " << request->file->fileName().c_str() << endl;
+#endif
+ openFiles[request->theFilePointer] = NULL;
+ files->put(request->file);
+ }
+ else
+ {
+ ndbout << "error while closing file" << endl;
+ exit(1);
+ }
+ theRequestPool->put(request);
+ closedFiles++;
+ }
+ else
+ {
+ ndbout << "Unexpected request received" << endl;
+ }
+ }
+ else
+ {
+ ndbout << "Nothing read from theReportChannel" << endl;
+ }
+ }
+ return 0;
+}
+
+int writeFileWait()
+{
+ Request* request = theReportChannel->readChannel();
+ if (request)
+ {
+ if (request->action == Request::write)
+ {
+ if (request->error == 0)
+ {
+#ifdef TESTDEBUG
+ ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
+#endif
+
+ }
+ else
+ {
+ ndbout << "error while writing file, error=" << request->error << endl;
+ exit(1);
+ }
+ theRequestPool->put(request);
+ }
+ else
+ {
+ ndbout << "Unexpected request received" << endl;
+ }
+ }
+ else
+ {
+ ndbout << "Nothing read from theReportChannel" << endl;
+ }
+ return 0;
+}
+
+int writeSyncFileWait()
+{
+ Request* request = theReportChannel->readChannel();
+ if (request)
+ {
+ if (request->action == Request::writeSync)
+ {
+ if (request->error == 0)
+ {
+#ifdef TESTDEBUG
+ ndbout << "writeFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
+#endif
+
+ }
+ else
+ {
+ ndbout << "error while writing file" << endl;
+ exit(1);
+ }
+ theRequestPool->put(request);
+ }
+ else
+ {
+ ndbout << "Unexpected request received" << endl;
+ }
+ }
+ else
+ {
+ ndbout << "Nothing read from theReportChannel" << endl;
+ }
+ return 0;
+}
+
+int readFileWait()
+{
+ Request* request = theReportChannel->readChannel();
+ if (request)
+ {
+ if (request->action == Request::read)
+ {
+ if (request->error == 0)
+ {
+#ifdef TESTDEBUG
+ ndbout << "readFileWait"<<request->theFilePointer<<", " << request->theUserPointer<<" "<< request->file->fileName().c_str() << endl;
+#endif
+ if (memcmp(&(ReadPages[request->theFilePointer][0]), &(WritePages[request->theFilePointer][0]), PAGESIZE)!=0)
+ {
+ ndbout <<"Verification error!" << endl;
+ for (int i = 0; i < PAGESIZE; i++ ){
+ ndbout <<" Compare Page " << i << " : " << ReadPages[request->theFilePointer][i] <<", " <<WritePages[request->theFilePointer][i] << endl;;
+ if( ReadPages[request->theFilePointer][i] !=WritePages[request->theFilePointer][i])
+
+ exit(1);
+ }
+ }
+
+ }
+ else
+ {
+ ndbout << "error while reading file" << endl;
+ exit(1);
+ }
+ theRequestPool->put(request);
+ }
+ else
+ {
+ ndbout << "Unexpected request received" << endl;
+ }
+ }
+ else
+ {
+ ndbout << "Nothing read from theReportChannel" << endl;
+ }
+ return 0;
+}
+
+int readArguments(int argc, const char** argv)
+{
+
+ int i = 1;
+ while (argc > 1)
+ {
+ if (strcmp(argv[i], "-n") == 0)
+ {
+ numberOfFiles = atoi(argv[i+1]);
+ if ((numberOfFiles < 1) || (numberOfFiles > MAXFILES))
+ {
+ ndbout << "Wrong number of files, default = "<<DEFAULT_NUM_FILES << endl;
+ numberOfFiles = DEFAULT_NUM_FILES;
+ }
+ }
+ else if (strcmp(argv[i], "-r") == 0)
+ {
+ numberOfRequests = atoi(argv[i+1]);
+ if ((numberOfRequests < 1) || (numberOfRequests > MAXREQUESTS))
+ {
+ ndbout << "Wrong number of requests, default = "<<DEFAULT_NUM_REQUESTS << endl;
+ numberOfRequests = DEFAULT_NUM_REQUESTS;
+ }
+ }
+ else if (strcmp(argv[i], "-s") == 0)
+ {
+ fileSize = atoi(argv[i+1]);
+ if ((fileSize < 1) || (fileSize > MAXFILESIZE))
+ {
+ ndbout << "Wrong number of 8k pages, default = "<<DEFAULT_FILESIZE << endl;
+ fileSize = DEFAULT_FILESIZE;
+ }
+ }
+ else if (strcmp(argv[i], "-l") == 0)
+ {
+ numberOfIterations = atoi(argv[i+1]);
+ if ((numberOfIterations < 1))
+ {
+ ndbout << "Wrong number of iterations, default = 1" << endl;
+ numberOfIterations = 1;
+ }
+ }
+ else if (strcmp(argv[i], "-remove") == 0)
+ {
+ removeFiles = 1;
+ argc++;
+ i--;
+ }
+ else if (strcmp(argv[i], "-reverse") == 0)
+ {
+ ndbout << "Writing files reversed" << endl;
+ writeFilesReverse = 1;
+ argc++;
+ i--;
+ }
+
+ argc -= 2;
+ i = i + 2;
+ }
+
+ if ((fileSize % numberOfRequests)!= 0)
+ {
+ numberOfRequests = numberOfRequests - (fileSize % numberOfRequests);
+ ndbout <<"numberOfRequest must be modulo of filesize" << endl;
+ ndbout << "New numberOfRequest="<<numberOfRequests<<endl;
+ }
+ return 0;
+}
+
+
+// Needed for linking...
+
+void ErrorReporter::handleError(ErrorCategory type, int messageID,
+ const char* problemData, const char* objRef, NdbShutdownType stype)
+{
+
+ ndbout << "ErrorReporter::handleError activated" << endl;
+ ndbout << "type= " << type << endl;
+ ndbout << "messageID= " << messageID << endl;
+ ndbout << "problemData= " << problemData << endl;
+ ndbout << "objRef= " << objRef << endl;
+
+ exit(1);
+}
+
+void ErrorReporter::handleAssert(const char* message, const char* file, int line)
+{
+ ndbout << "ErrorReporter::handleAssert activated" << endl;
+ ndbout << "message= " << message << endl;
+ ndbout << "file= " << file << endl;
+ ndbout << "line= " << line << endl;
+ exit(1);
+}
+
+
+GlobalData globalData;
+
+
+Signal::Signal()
+{
+
+}
+
diff --git a/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile
new file mode 100644
index 00000000000..b0356e6da68
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/AsyncFileTest/Makefile
@@ -0,0 +1,27 @@
+include .defs.mk
+
+TYPE := kernel
+
+BIN_TARGET := aftest
+BIN_TARGET_ARCHIVES := ndbfs portlib trace signaldataprint
+
+SOURCES = AsyncFileTest.cpp
+
+CFLAGS_AsyncFileTest.cpp = -I../
+
+include $(NDB_TOP)/Epilogue.mk
+
+
+# run basic tests
+run_test :
+ $(NDB_TOP)/bin/$(BIN_TARGET)
+ $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -remove
+ $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -reverse -remove
+ $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 8 -l 10 -s 512 -remove
+ $(NDB_TOP)/bin/$(BIN_TARGET) -n 8 -r 4 -l 1000
+
+
+
+
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp b/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp
new file mode 100644
index 00000000000..30b40097c9b
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/CircularIndex.cpp
@@ -0,0 +1,20 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "CircularIndex.hpp"
+
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp b/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp
new file mode 100644
index 00000000000..349cccdbcb4
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/CircularIndex.hpp
@@ -0,0 +1,116 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef CircularIndex_H
+#define CircularIndex_H
+
+//===========================================================================
+//
+// .DESCRIPTION
+// Building block for circular buffers. It increment as a normal index.
+// untill it it becomes the maximum size then it becomes zero.
+//
+// .TYPICAL USE:
+// to implement a circular buffer.
+//
+// .EXAMPLE:
+// See MemoryChannel.C
+//===========================================================================
+
+///////////////////////////////////////////////////////////////////////////////
+// CircularIndex( int start= 0,int size=256 );
+// Constuctor
+// Parameters:
+// start: where to start to index
+// size : range of the index, will be from 0 to size-1
+///////////////////////////////////////////////////////////////////////////////
+// operator int ();
+// returns the index
+///////////////////////////////////////////////////////////////////////////////
+// void operator ++ ();
+// increments the index with one, of size is reached it is set to zero
+///////////////////////////////////////////////////////////////////////////////
+// friend int full( const CircularIndex& write, const CircularIndex& read );
+// Taken the write index and the read index from a buffer it is calculated
+// if the buffer is full
+// Parameters:
+// write: index used a write index for the buffer
+// read : index used a read index for the buffer
+// return
+// 0 : not full
+// 1 : full
+///////////////////////////////////////////////////////////////////////////////
+// friend int empty( const CircularIndex& write, const CircularIndex& read );
+// Taken the write index and the read index from a buffer it is calculated
+// if the buffer is empty
+// Parameters:
+// write: index used a write index for the buffer
+// read : index used a read index for the buffer
+// return
+// 0 : not empty
+// 1 : empty
+///////////////////////////////////////////////////////////////////////////////
+
+class CircularIndex
+{
+public:
+ inline CircularIndex( int start= 0,int size=256 );
+ operator int ();
+ CircularIndex& operator ++ ();
+ friend int full( const CircularIndex& write, const CircularIndex& read );
+ friend int empty( const CircularIndex& write, const CircularIndex& read );
+private:
+ int theSize;
+ int theIndex;
+};
+
+inline CircularIndex::operator int ()
+{
+ return theIndex;
+}
+
+inline CircularIndex& CircularIndex::operator ++ ()
+{
+ ++theIndex;
+ if( theIndex >= theSize ){
+ theIndex= 0;
+ }
+ return *this;
+}
+
+
+inline int full( const CircularIndex& write, const CircularIndex& read )
+{
+ int readTmp= read.theIndex;
+
+ if( read.theIndex < write.theIndex )
+ readTmp += read.theSize;
+
+ return ( readTmp - write.theIndex) == 1;
+}
+
+inline int empty( const CircularIndex& write, const CircularIndex& read )
+{
+ return read.theIndex == write.theIndex;
+}
+
+
+inline CircularIndex::CircularIndex( int start,int size ):
+ theSize(size),
+ theIndex(start)
+{
+}
+#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/Filename.cpp b/ndb/src/kernel/blocks/ndbfs/Filename.cpp
new file mode 100644
index 00000000000..98ff7c7e4e4
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Filename.cpp
@@ -0,0 +1,220 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <stdlib.h>
+#include <string.h>
+#include <NdbStdio.h>
+#include <NdbUnistd.h>
+#include <NdbOut.hpp>
+
+#include "Filename.hpp"
+#include "ErrorHandlingMacros.hpp"
+#include "Error.hpp"
+#include "RefConvert.hpp"
+#include "DebuggerNames.hpp"
+
+#include <signaldata/FsOpenReq.hpp>
+
+static const char* fileExtension[] = {
+ ".Data",
+ ".FragLog",
+ ".LocLog",
+ ".FragList",
+ ".TableList",
+ ".SchemaLog",
+ ".sysfile",
+ ".log",
+ ".ctl"
+};
+
+static const Uint32 noOfExtensions = sizeof(fileExtension)/sizeof(char*);
+
+Filename::Filename() :
+ theLevelDepth(0)
+{
+}
+
+void
+Filename::init(const char * pFileSystemPath){
+ if (pFileSystemPath == NULL) {
+ ERROR_SET(fatal, AFS_ERROR_NOPATH, ""," Filename::init()");
+ return;
+ }
+
+ strncpy(theBaseDirectory, pFileSystemPath, PATH_MAX);
+
+ // the environment variable is set,
+ // check that it is pointing on a valid directory
+ //
+ char buf2[PATH_MAX]; memset(buf2, 0,sizeof(buf2));
+#ifdef NDB_WIN32
+ char* szFilePart;
+ if(!GetFullPathName(theBaseDirectory, sizeof(buf2), buf2, &szFilePart)
+ || (::GetFileAttributes(theBaseDirectory)&FILE_ATTRIBUTE_READONLY))
+#else
+ if((::realpath(theBaseDirectory, buf2) == NULL)||
+ (::access(theBaseDirectory, W_OK) != 0))
+#endif
+ {
+ ERROR_SET(fatal, AFS_ERROR_INVALIDPATH, pFileSystemPath, " Filename::init()");
+ }
+ strncpy(theBaseDirectory, buf2, sizeof(theBaseDirectory));
+ // path seems ok, add delimiter if missing
+ if (strcmp(&theBaseDirectory[strlen(theBaseDirectory) - 1],
+ DIR_SEPARATOR) != 0)
+ strcat(theBaseDirectory, DIR_SEPARATOR);
+
+}
+
+
+Filename::~Filename(){
+}
+
+void
+Filename::set(BlockReference blockReference,
+ const Uint32 filenumber[4], bool dir)
+{
+ char buf[PATH_MAX];
+ theLevelDepth = 0;
+ strncpy(theName, theBaseDirectory, PATH_MAX);
+
+ const Uint32 type = FsOpenReq::getSuffix(filenumber);
+ const Uint32 version = FsOpenReq::getVersion(filenumber);
+ switch(version){
+ case 1 :{
+ const Uint32 diskNo = FsOpenReq::v1_getDisk(filenumber);
+ const Uint32 table = FsOpenReq::v1_getTable(filenumber);
+ const Uint32 frag = FsOpenReq::v1_getFragment(filenumber);
+ const Uint32 S_val = FsOpenReq::v1_getS(filenumber);
+ const Uint32 P_val = FsOpenReq::v1_getP(filenumber);
+
+ if (diskNo < 0xff){
+ snprintf(buf, sizeof(buf), "D%d%s", diskNo, DIR_SEPARATOR);
+ strcat(theName, buf);
+ theLevelDepth++;
+ }
+
+ {
+ const char* blockName = getBlockName( refToBlock(blockReference) );
+ if (blockName == NULL){
+ ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","No Block Name");
+ return;
+ }
+ snprintf(buf, sizeof(buf), "%s%s", blockName, DIR_SEPARATOR);
+ strcat(theName, buf);
+ theLevelDepth++;
+ }
+
+ if (table < 0xffffffff){
+ snprintf(buf, sizeof(buf), "T%d%s", table, DIR_SEPARATOR);
+ strcat(theName, buf);
+ theLevelDepth++;
+ }
+
+ if (frag < 0xffffffff){
+ snprintf(buf, sizeof(buf), "F%d%s", frag, DIR_SEPARATOR);
+ strcat(theName, buf);
+ theLevelDepth++;
+ }
+
+
+ if (S_val < 0xffffffff){
+ snprintf(buf, sizeof(buf), "S%d", S_val);
+ strcat(theName, buf);
+ }
+
+ if (P_val < 0xff){
+ snprintf(buf, sizeof(buf), "P%d", P_val);
+ strcat(theName, buf);
+ }
+
+ }
+ break;
+ case 2:{
+ const Uint32 seq = FsOpenReq::v2_getSequence(filenumber);
+ const Uint32 nodeId = FsOpenReq::v2_getNodeId(filenumber);
+ const Uint32 count = FsOpenReq::v2_getCount(filenumber);
+
+ snprintf(buf, sizeof(buf), "BACKUP%sBACKUP-%d%s",
+ DIR_SEPARATOR, seq, DIR_SEPARATOR);
+ strcat(theName, buf);
+ if(count == 0xffffffff) {
+ snprintf(buf, sizeof(buf), "BACKUP-%d.%d",
+ seq, nodeId); strcat(theName, buf);
+ } else {
+ snprintf(buf, sizeof(buf), "BACKUP-%d-%d.%d",
+ seq, count, nodeId); strcat(theName, buf);
+ }
+ theLevelDepth = 2;
+ break;
+ }
+ break;
+ case 3:{
+ const Uint32 diskNo = FsOpenReq::v1_getDisk(filenumber);
+
+ if(diskNo == 0xFF){
+ ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","Invalid disk specification");
+ }
+
+ snprintf(buf, sizeof(buf), "D%d%s", diskNo, DIR_SEPARATOR);
+ strcat(theName, buf);
+ theLevelDepth++;
+ }
+ break;
+ default:
+ ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","Wrong version");
+ }
+ if (type >= noOfExtensions){
+ ERROR_SET(ecError, AFS_ERROR_PARAMETER,"","File Type doesn't exist");
+ return;
+ }
+ strcat(theName, fileExtension[type]);
+
+ if(dir == true){
+ for(Uint32 l = strlen(theName) - 1; l >= 0; l--){
+ if(theName[l] == DIR_SEPARATOR[0]){
+ theName[l] = 0;
+ break;
+ }
+ }
+ }
+}
+
+/**
+ * Find out directory name on level
+ * Ex:
+ * theName = "/tmp/fs/T0/NDBFS/D0/P0/S27.data"
+ * level = 1
+ * would return "/tmp/fs/T0/NDBFS/
+ */
+const char* Filename::directory(int level)
+{
+ const char* p;
+
+ p = theName;
+ p += strlen(theBaseDirectory);
+
+ for (int i = 0; i <= level; i++){
+ p = strstr(p, DIR_SEPARATOR);
+ p++;
+ }
+
+ strncpy(theDirectory, theName, p - theName - 1);
+ theDirectory[p-theName-1] = 0;
+ return theDirectory;
+}
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/Filename.hpp b/ndb/src/kernel/blocks/ndbfs/Filename.hpp
new file mode 100644
index 00000000000..4c3569b5485
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Filename.hpp
@@ -0,0 +1,97 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef Filename_H
+#define Filename_H
+
+//===========================================================================
+//
+// .DESCRIPTION
+// Takes a 128 bits value (done as a array of four longs) and
+// makes a filename out of it acording the following schema
+// Bits 0-31 T
+// Bits 32-63 F
+// Bits 64-95 S
+// Bits 96-103 P
+// Bits 104-111 D
+// Bits 112-119 File Type
+// Bits 120-127 Version number of Filename
+//
+// T, is used to find/create a directory. If T = 0xFFFF then the
+// file is on top level. In that case the F is of no relevance.
+// F, same as T.
+// S, is used to find/create a filename. If S= 0xFFFF then it is ignored.
+// P, same as S
+// D, is used to find/create the root directory, this is the
+// directory before the blockname. If D= 0xFF then it is ignored.
+// File Type
+// 0 => .Data
+// 1 => .FragLog
+// 2 => .LocLog
+// 3 => .FragList
+// 4 => .TableList
+// 5 => .SchemaLog
+// 6 => .sysfile
+// 15=> ignored
+// Version number of Filename, current version is 0x1, must be
+// used for the this style of options.
+//
+//
+//===========================================================================
+
+#include <kernel_types.h>
+#include <NdbUnistd.h>
+
+class Filename
+{
+public:
+ // filenumber is 64 bits but is split in to 4 32bits words
+ Filename();
+ ~Filename();
+ void set(BlockReference blockReference,
+ const Uint32 filenumber[4], bool dir = false);
+ const char* baseDirectory() const;
+ const char* directory(int level);
+ int levels() const;
+ const char* c_str() const;
+
+ void init(const char * fileSystemPath);
+
+private:
+ int theLevelDepth;
+ char theName[PATH_MAX];
+ char theBaseDirectory[PATH_MAX];
+ char theDirectory[PATH_MAX];
+};
+
+// inline methods
+inline const char* Filename::c_str() const{
+ return theName;
+}
+
+inline const char* Filename::baseDirectory() const{
+ return theBaseDirectory;
+}
+
+inline int Filename::levels() const{
+ return theLevelDepth;
+}
+
+#endif
+
+
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/Makefile b/ndb/src/kernel/blocks/ndbfs/Makefile
new file mode 100644
index 00000000000..58e1458bf16
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Makefile
@@ -0,0 +1,14 @@
+include .defs.mk
+
+TYPE := kernel
+
+ARCHIVE_TARGET := ndbfs
+
+SOURCES = \
+ AsyncFile.cpp \
+ Ndbfs.cpp VoidFs.cpp \
+ Filename.cpp \
+ CircularIndex.cpp
+
+include $(NDB_TOP)/Epilogue.mk
+
diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp
new file mode 100644
index 00000000000..a1aebdef7a1
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.cpp
@@ -0,0 +1,18 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+//#include "MemoryChannel.hpp"
+
diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
new file mode 100644
index 00000000000..6e0c2721ca0
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannel.hpp
@@ -0,0 +1,168 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef MemoryChannel_H
+#define MemoryChannel_H
+
+//===========================================================================
+//
+// .DESCRIPTION
+// Pointer based communication channel for communication between two
+// thread. It does not copy any data in or out the channel so the
+// item that is put in can not be used untill the other thread has
+// given it back. There is no support for detecting the return of a
+// item. The channel is half-duplex.
+// For comminication between 1 writer and 1 reader use the MemoryChannel
+// class, for comminication between multiple writer and 1 reader use the
+// MemoryChannelMultipleWriter. There is no support for multiple readers.
+//
+// .TYPICAL USE:
+// to communicate between threads.
+//
+// .EXAMPLE:
+// See AsyncFile.C
+//===========================================================================
+//
+//
+// MemoryChannel( int size= 256);
+// Constuctor
+// Parameters:
+// size : amount of pointer it can hold
+//
+// void operator ++ ();
+// increments the index with one, if size is reached it is set to zero
+//
+// virtual void write( T *t);
+// Puts the item in the channel if the channel is full an error is reported.
+// Parameters:
+// t: pointer to item to put in the channel, after this the item
+// is shared with the other thread.
+// errors
+// AFS_ERROR_CHANNALFULL, channel is full
+//
+// T* read();
+// Reads a itemn from the channel, if channel is empty it blocks untill
+// an item can be read.
+// return
+// T : item from the channel
+//
+// T* tryRead();
+// Reads a item from the channel, if channel is empty it returns zero.
+// return
+// T : item from the channel or zero if channel is empty.
+//
+
+#if defined NDB_OSE || defined NDB_SOFTOSE
+#include "MemoryChannelOSE.hpp"
+#else
+
+#include "ErrorHandlingMacros.hpp"
+#include "Error.hpp"
+#include "CircularIndex.hpp"
+#include "NdbMutex.h"
+#include "NdbCondition.h"
+#include <NdbOut.hpp>
+
+#include <assert.h>
+
+template <class T>
+class MemoryChannel
+{
+public:
+ MemoryChannel( int size= 256);
+ virtual ~MemoryChannel( );
+
+ virtual void writeChannel( T *t);
+ T* readChannel();
+ T* tryReadChannel();
+
+private:
+ int theSize;
+ T **theChannel;
+ CircularIndex theWriteIndex;
+ CircularIndex theReadIndex;
+ NdbMutex* theMutexPtr;
+ NdbCondition* theConditionPtr;
+
+};
+
+
+template <class T> MemoryChannel<T>::MemoryChannel( int size):
+ theSize(size),
+ theChannel(new T*[size] ),
+ theWriteIndex(0, size),
+ theReadIndex(0, size)
+{
+ theMutexPtr = NdbMutex_Create();
+ theConditionPtr = NdbCondition_Create();
+}
+
+template <class T> MemoryChannel<T>::~MemoryChannel( )
+{
+ NdbMutex_Destroy(theMutexPtr);
+ NdbCondition_Destroy(theConditionPtr);
+ delete [] theChannel;
+}
+
+template <class T> void MemoryChannel<T>::writeChannel( T *t)
+{
+
+ NdbMutex_Lock(theMutexPtr);
+ REQUIRE(!full(theWriteIndex, theReadIndex), "Memory Channel Full");
+ REQUIRE(theChannel != NULL, "Memory Channel Full");
+ theChannel[theWriteIndex]= t;
+ ++theWriteIndex;
+ NdbMutex_Unlock(theMutexPtr);
+ NdbCondition_Signal(theConditionPtr);
+}
+
+
+template <class T> T* MemoryChannel<T>::readChannel()
+{
+ T* tmp;
+
+ NdbMutex_Lock(theMutexPtr);
+ while ( empty(theWriteIndex, theReadIndex) )
+ {
+ NdbCondition_Wait(theConditionPtr,
+ theMutexPtr);
+ }
+
+ tmp= theChannel[theReadIndex];
+ ++theReadIndex;
+ NdbMutex_Unlock(theMutexPtr);
+ return tmp;
+}
+
+template <class T> T* MemoryChannel<T>::tryReadChannel()
+{
+ T* tmp= 0;
+ NdbMutex_Lock(theMutexPtr);
+ NdbCondition_WaitTimeout(theConditionPtr,
+ theMutexPtr, 0);
+ if ( !empty(theWriteIndex, theReadIndex) )
+ {
+ tmp= theChannel[theReadIndex];
+ ++theReadIndex;
+ }
+ NdbMutex_Unlock(theMutexPtr);
+ return tmp;
+}
+
+#endif
+
+#endif // MemoryChannel_H
+
diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp
new file mode 100644
index 00000000000..9f70efcadf7
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelOSE.hpp
@@ -0,0 +1,205 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef MemoryChannelOSE_H
+#define MemoryChannelOSE_H
+
+//===========================================================================
+//
+// .DESCRIPTION
+// Pointer based communication channel for communication between two
+// thread. It sends the pointer to the other signal via an OSE signal
+//
+// .TYPICAL USE:
+// to communicate between threads.
+//
+// .EXAMPLE:
+// See AsyncFile.C
+//===========================================================================
+//
+//
+// MemoryChannel( int size= 256);
+// Constuctor
+// Parameters:
+// size : is ignored in OSE version
+//
+// void operator ++ ();
+// increments the index with one, if size is reached it is set to zero
+//
+// virtual void write( T *t);
+// Puts the item in the channel if the channel is full an error is reported.
+// Parameters:
+// t: pointer to item to put in the channel, after this the item
+// is shared with the other thread.
+// errors
+// AFS_ERROR_CHANNALFULL, channel is full
+//
+// T* read();
+// Reads a itemn from the channel, if channel is empty it blocks untill
+// an item can be read.
+// return
+// T : item from the channel
+//
+// T* tryRead();
+// Reads a item from the channel, if channel is empty it returns zero.
+// return
+// T : item from the channel or zero if channel is empty.
+//
+
+#include <ose.h>
+#include "ErrorHandlingMacros.hpp"
+#include "Error.hpp"
+#include "NdbMutex.h"
+#include "NdbCondition.h"
+
+#include <assert.h>
+
+
+
+
+template <class T>
+class MemoryChannel
+{
+public:
+ MemoryChannel( int size= 256);
+ virtual ~MemoryChannel( );
+
+ virtual void writeChannel( T *t);
+ T* readChannel();
+ T* tryReadChannel();
+
+private:
+ PROCESS theReceiverPid;
+};
+
+template <class T> class MemoryChannelMultipleWriter:public MemoryChannel<T>
+{
+public:
+ MemoryChannelMultipleWriter( int size= 256);
+ ~MemoryChannelMultipleWriter( );
+ void writeChannel( T *t);
+
+private:
+};
+
+
+#define MEMCHANNEL_SIGBASE 5643
+
+#define MEMCHANNEL_SIGNAL (MEMCHANNEL_SIGBASE + 1) /* !-SIGNO(struct MemChannelSignal)-! */
+
+
+struct MemChannelSignal
+{
+ SIGSELECT sigNo;
+ void* ptr;
+};
+
+union SIGNAL
+{
+ SIGSELECT sigNo;
+ struct MemChannelSignal memChanSig;
+};
+
+template <class T> MemoryChannel<T>::MemoryChannel( int size )
+{
+ // Default receiver for this channel is the creating process
+ theReceiverPid = current_process();
+}
+
+template <class T> MemoryChannel<T>::~MemoryChannel( )
+{
+}
+
+template <class T> void MemoryChannel<T>::writeChannel( T *t)
+{
+ union SIGNAL* sig;
+
+ sig = alloc(sizeof(struct MemChannelSignal), MEMCHANNEL_SIGNAL);
+ ((struct MemChannelSignal*)sig)->ptr = t;
+ send(&sig, theReceiverPid);
+}
+
+
+template <class T> T* MemoryChannel<T>::readChannel()
+{
+ T* tmp;
+
+ static const SIGSELECT sel_mem[] = {1, MEMCHANNEL_SIGNAL};
+ union SIGNAL* sig;
+
+ tmp = NULL; /* Default value */
+
+ sig = receive((SIGSELECT*)sel_mem);
+ if (sig != NIL){
+ if (sig->sigNo == MEMCHANNEL_SIGNAL){
+ tmp = (T*)(((struct MemChannelSignal*)sig)->ptr);
+ }else{
+ assert(1==0);
+ }
+ free_buf(&sig);
+ }
+
+ return tmp;
+}
+
+template <class T> T* MemoryChannel<T>::tryReadChannel()
+{
+ T* tmp;
+
+ static const SIGSELECT sel_mem[] = {1, MEMCHANNEL_SIGNAL};
+ union SIGNAL* sig;
+
+ tmp = NULL; /* Default value */
+
+ sig = receive_w_tmo(0, (SIGSELECT*)sel_mem);
+ if (sig != NIL){
+ if (sig->sigNo == MEMCHANNEL_SIGNAL){
+ tmp = (T*)(((struct MemChannelSignal*)sig)->ptr);
+ }else{
+ assert(1==0);
+ }
+ free_buf(&sig);
+ }
+
+ return tmp;
+}
+
+
+#endif // MemoryChannel_H
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile
new file mode 100644
index 00000000000..68f71bfc4cd
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/Makefile
@@ -0,0 +1,13 @@
+include .defs.mk
+
+TYPE := kernel
+
+BIN_TARGET := mctest
+BIN_TARGET_ARCHIVES := portlib
+
+SOURCES = MemoryChannelTest.cpp
+
+CFLAGS_MemoryChannelTest.cpp = -I../
+
+include $(NDB_TOP)/Epilogue.mk
+
diff --git a/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp
new file mode 100644
index 00000000000..aeab9f7828d
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/MemoryChannelTest/MemoryChannelTest.cpp
@@ -0,0 +1,197 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "MemoryChannel.hpp"
+#include "NdbThread.h"
+#include "NdbSleep.h"
+#include "NdbOut.hpp"
+#include "NdbMain.h"
+
+
+
+MemoryChannel<int>* theMemoryChannel;
+
+
+extern "C" void* runProducer(void*arg)
+{
+ // The producer will items into the MemoryChannel
+ int count = *(int*)arg;
+ int* p;
+ int i = 0;
+ while (i <= count)
+ {
+ p = new int(i);
+ ndbout << "P: " << *p << endl;
+ theMemoryChannel->writeChannel(p);
+ if (i%5==0)
+ NdbSleep_MilliSleep(i);
+ i++;
+ }
+ NdbThread_Exit(0);
+ return NULL;
+}
+
+extern "C" void* runConsumer(void* arg)
+{
+ // The producer will read items from MemoryChannel and print on screen
+ int count = *(int*)arg;
+ int* p;
+ int i = 0;
+ while (i < count)
+ {
+ p = theMemoryChannel->readChannel();
+ ndbout << "C: " << *p << endl;
+ i = *p;
+ delete p;
+
+ }
+ NdbThread_Exit(0);
+ return NULL;
+}
+
+
+
+class ArgStruct
+{
+public:
+ ArgStruct(int _items, int _no){
+ items=_items;
+ no=_no;
+ };
+ int items;
+ int no;
+};
+
+MemoryChannelMultipleWriter<ArgStruct>* theMemoryChannel2;
+
+extern "C" void* runProducer2(void*arg)
+{
+ // The producer will items into the MemoryChannel
+ ArgStruct* pArg = (ArgStruct*)arg;
+ int count = pArg->items;
+ ArgStruct* p;
+ int i = 0;
+ while (i < count)
+ {
+ p = new ArgStruct(i, pArg->no);
+ ndbout << "P"<<pArg->no<<": " << i << endl;
+ theMemoryChannel2->writeChannel(p);
+ NdbSleep_MilliSleep(i);
+ i++;
+ }
+ NdbThread_Exit(0);
+ return NULL;
+}
+
+extern "C" void* runConsumer2(void* arg)
+{
+ // The producer will read items from MemoryChannel and print on screen
+ ArgStruct* pArg = (ArgStruct*)arg;
+ int count = pArg->items * pArg->no;
+ ArgStruct* p;
+ int i = 0;
+ while (i < count)
+ {
+ p = theMemoryChannel2->readChannel();
+ ndbout << "C: "<< p->no << ", " << p->items << endl;
+ i++;
+ delete p;
+ }
+ ndbout << "Consumer2: " << count << " received" << endl;
+ NdbThread_Exit(0);
+ return NULL;
+}
+
+
+
+
+//#if defined MEMORYCHANNELTEST
+
+//int main(int argc, char **argv)
+NDB_COMMAND(mctest, "mctest", "mctest", "Test the memory channel used in Ndb", 32768)
+{
+
+ ndbout << "==== testing MemoryChannel ====" << endl;
+
+ theMemoryChannel = new MemoryChannel<int>;
+ theMemoryChannel2 = new MemoryChannelMultipleWriter<ArgStruct>;
+
+ NdbThread* consumerThread;
+ NdbThread* producerThread;
+
+ NdbThread_SetConcurrencyLevel(2);
+
+ int numItems = 100;
+ producerThread = NdbThread_Create(runProducer,
+ (void**)&numItems,
+ 4096,
+ (char*)"producer");
+
+ consumerThread = NdbThread_Create(runConsumer,
+ (void**)&numItems,
+ 4096,
+ (char*)"consumer");
+
+
+ void *status;
+ NdbThread_WaitFor(consumerThread, &status);
+ NdbThread_WaitFor(producerThread, &status);
+
+ ndbout << "==== testing MemoryChannelMultipleWriter ====" << endl;
+#define NUM_THREADS2 5
+ NdbThread_SetConcurrencyLevel(NUM_THREADS2+2);
+ NdbThread* producerThreads[NUM_THREADS2];
+
+ ArgStruct *pArg;
+ for (int j = 0; j < NUM_THREADS2; j++)
+ {
+ char buf[25];
+ sprintf((char*)&buf, "producer%d", j);
+ pArg = new ArgStruct(numItems, j);
+ producerThreads[j] = NdbThread_Create(runProducer2,
+ (void**)pArg,
+ 4096,
+ (char*)&buf);
+ }
+
+ pArg = new ArgStruct(numItems, NUM_THREADS2);
+ consumerThread = NdbThread_Create(runConsumer2,
+ (void**)pArg,
+ 4096,
+ (char*)"consumer");
+
+
+ NdbThread_WaitFor(consumerThread, &status);
+ for (int j = 0; j < NUM_THREADS2; j++)
+ {
+ NdbThread_WaitFor(producerThreads[j], &status);
+ }
+
+
+ return 0;
+
+}
+
+void ErrorReporter::handleError(ErrorCategory type, int messageID,
+ const char* problemData, const char* objRef,
+ NdbShutdownType nst)
+{
+
+ ndbout << "ErrorReporter::handleError activated" << endl;
+ exit(1);
+}
+
+//#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp b/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
new file mode 100644
index 00000000000..8992a2104e9
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
@@ -0,0 +1,1008 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <limits.h>
+#include <errno.h>
+
+#include "Ndbfs.hpp"
+#include "AsyncFile.hpp"
+#include "Filename.hpp"
+#include "Error.hpp"
+
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsCloseReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <signaldata/FsAppendReq.hpp>
+#include <signaldata/FsRemoveReq.hpp>
+#include <signaldata/FsConf.hpp>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/NdbfsContinueB.hpp>
+#include <signaldata/DumpStateOrd.hpp>
+
+#include <RefConvert.hpp>
+#include <NdbSleep.h>
+#include <NdbOut.hpp>
+#include <Configuration.hpp>
+
+#define DEBUG(x) { ndbout << "FS::" << x << endl; }
+
+inline
+int pageSize( const NewVARIABLE* baseAddrRef )
+{
+ int log_psize;
+ int log_qsize = baseAddrRef->bits.q;
+ int log_vsize = baseAddrRef->bits.v;
+ if (log_vsize < 3)
+ log_vsize = 3;
+ log_psize = log_qsize + log_vsize - 3;
+ return (1 << log_psize);
+}
+
+
+Ndbfs::Ndbfs(const Configuration & conf) :
+ SimulatedBlock(NDBFS, conf),
+ scanningInProgress(false),
+ theLastId(0),
+ m_maxOpenedFiles(0)
+{
+ theFileSystemPath = conf.fileSystemPath();
+ theRequestPool = new Pool<Request>;
+
+ const Properties * p = conf.getOwnProperties();
+ ndbrequire(p != 0);
+
+ ndbrequire(p->get("MaxNoOfOpenFiles", &m_maxFiles));
+
+ // Create idle AsyncFiles
+ Uint32 noIdleFiles = 16;
+ for (Uint32 i = 0; i < noIdleFiles; i++){
+ theIdleFiles.push_back(createAsyncFile());
+ }
+
+ BLOCK_CONSTRUCTOR(Ndbfs);
+
+ // Set received signals
+ addRecSignal(GSN_DUMP_STATE_ORD, &Ndbfs::execDUMP_STATE_ORD);
+ addRecSignal(GSN_STTOR, &Ndbfs::execSTTOR);
+ addRecSignal(GSN_FSOPENREQ, &Ndbfs::execFSOPENREQ);
+ addRecSignal(GSN_FSCLOSEREQ, &Ndbfs::execFSCLOSEREQ);
+ addRecSignal(GSN_FSWRITEREQ, &Ndbfs::execFSWRITEREQ);
+ addRecSignal(GSN_FSREADREQ, &Ndbfs::execFSREADREQ);
+ addRecSignal(GSN_FSSYNCREQ, &Ndbfs::execFSSYNCREQ);
+ addRecSignal(GSN_CONTINUEB, &Ndbfs::execCONTINUEB);
+ addRecSignal(GSN_FSAPPENDREQ, &Ndbfs::execFSAPPENDREQ);
+ addRecSignal(GSN_FSREMOVEREQ, &Ndbfs::execFSREMOVEREQ);
+ // Set send signals
+}
+
+Ndbfs::~Ndbfs()
+{
+ // Delete all files
+ // AsyncFile destuctor will take care of deleting
+ // the thread it has created
+ for (unsigned i = 0; i < theFiles.size(); i++){
+ AsyncFile* file = theFiles[i];
+ delete file;
+ theFiles[i] = NULL;
+ }//for
+ theFiles.clear();
+
+ delete theRequestPool;
+}
+
+/* Received a restart signal.
+ * Answer it like any other block
+ * PR0 : StartCase
+ * DR0 : StartPhase
+ * DR1 : ?
+ * DR2 : ?
+ * DR3 : ?
+ * DR4 : ?
+ * DR5 : SignalKey
+ */
+void
+Ndbfs::execSTTOR(Signal* signal)
+{
+ jamEntry();
+
+ if(signal->theData[1] == 0){ // StartPhase 0
+ jam();
+ cownref = NDBFS_REF;
+ // close all open files
+ ndbrequire(theOpenFiles.size() == 0);
+
+ scanningInProgress = false;
+
+ signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
+ sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 1);
+
+ signal->theData[3] = 255;
+ sendSignal(NDBCNTR_REF, GSN_STTORRY, signal,4, JBB);
+ return;
+ }
+ ndbrequire(0);
+}
+
+int
+Ndbfs::forward( AsyncFile * file, Request* request)
+{
+ jam();
+ file->execute(request);
+ return 1;
+}
+
+void
+Ndbfs::execFSOPENREQ(Signal* signal)
+{
+ jamEntry();
+ const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
+ const BlockReference userRef = fsOpenReq->userReference;
+ AsyncFile* file = getIdleFile();
+ ndbrequire(file != NULL);
+ ndbrequire(signal->getLength() == FsOpenReq::SignalLength)
+ file->theFileName.set( userRef, fsOpenReq->fileNumber);
+ file->reportTo(&theFromThreads);
+
+ Request* request = theRequestPool->get();
+ request->action = Request::open;
+ request->error = 0;
+ request->par.open.flags = fsOpenReq->fileFlags;
+ request->set(userRef, fsOpenReq->userPointer, newId() );
+ request->file = file;
+ request->theTrace = signal->getTrace();
+
+ ndbrequire(forward(file, request));
+}
+
+void
+Ndbfs::execFSREMOVEREQ(Signal* signal)
+{
+ jamEntry();
+ const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
+ const BlockReference userRef = req->userReference;
+ AsyncFile* file = getIdleFile();
+ ndbrequire(file != NULL);
+
+ file->theFileName.set( userRef, req->fileNumber, req->directory);
+ file->reportTo(&theFromThreads);
+
+ Request* request = theRequestPool->get();
+ request->action = Request::rmrf;
+ request->par.rmrf.directory = req->directory;
+ request->par.rmrf.own_directory = req->ownDirectory;
+ request->error = 0;
+ request->set(userRef, req->userPointer, newId() );
+ request->file = file;
+ request->theTrace = signal->getTrace();
+
+ ndbrequire(forward(file, request));
+}
+
+/*
+ * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
+ * remove file
+ */
+void
+Ndbfs::execFSCLOSEREQ(Signal * signal)
+{
+ jamEntry();
+ const FsCloseReq * const fsCloseReq = (FsCloseReq *)&signal->theData[0];
+ const BlockReference userRef = fsCloseReq->userReference;
+ const Uint16 filePointer = (Uint16)fsCloseReq->filePointer;
+ const UintR userPointer = fsCloseReq->userPointer;
+
+ AsyncFile* openFile = theOpenFiles.find(filePointer);
+ if (openFile == NULL) {
+ // The file was not open, send error back to sender
+ jam();
+ // Initialise FsRef signal
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
+ fsRef->osErrorCode = ~0; // Indicate local error
+ sendSignal(userRef, GSN_FSCLOSEREF, signal, 3, JBB);
+ return;
+ }
+
+ Request *request = theRequestPool->get();
+ if( fsCloseReq->getRemoveFileFlag(fsCloseReq->fileFlag) == true ) {
+ jam();
+ request->action = Request::closeRemove;
+ } else {
+ jam();
+ request->action = Request::close;
+ }
+ request->set(userRef, fsCloseReq->userPointer, filePointer);
+ request->file = openFile;
+ request->error = 0;
+ request->theTrace = signal->getTrace();
+
+ ndbrequire(forward(openFile, request));
+}
+
+void
+Ndbfs::readWriteRequest(int action, Signal * signal)
+{
+ const FsReadWriteReq * const fsRWReq = (FsReadWriteReq *)&signal->theData[0];
+ Uint16 filePointer = (Uint16)fsRWReq->filePointer;
+ const UintR userPointer = fsRWReq->userPointer;
+ const BlockReference userRef = fsRWReq->userReference;
+ const BlockNumber blockNumber = refToBlock(userRef);
+
+ AsyncFile* openFile = theOpenFiles.find(filePointer);
+
+ const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
+ unsigned int tPageSize;
+ unsigned int tClusterSize;
+ unsigned int tNRR;
+ unsigned int tPageOffset;
+ char* tWA;
+ FsRef::NdbfsErrorCodeType errorCode;
+
+ Request *request = theRequestPool->get();
+ request->error = 0;
+ request->set(userRef, userPointer, filePointer);
+ request->file = openFile;
+ request->action = (Request::Action) action;
+ request->theTrace = signal->getTrace();
+
+ if (fsRWReq->numberOfPages == 0) { //Zero pages not allowed
+ jam();
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+
+ if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
+ jam();// Ensure that a valid variable is used
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+ if (myBaseAddrRef == NULL) {
+ jam(); // Ensure that a valid variable is used
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+ if (openFile == NULL) {
+ jam(); //file not open
+ errorCode = FsRef::fsErrFileDoesNotExist;
+ goto error;
+ }
+ tPageSize = pageSize(myBaseAddrRef);
+ tClusterSize = myBaseAddrRef->ClusterSize;
+ tNRR = myBaseAddrRef->nrr;
+ tWA = (char*)myBaseAddrRef->WA;
+
+ switch (fsRWReq->getFormatFlag(fsRWReq->operationFlag)) {
+
+ // List of memory and file pages pairs
+ case FsReadWriteReq::fsFormatListOfPairs: {
+ jam();
+ for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
+ jam();
+ const Uint32 varIndex = fsRWReq->data.listOfPair[i].varIndex;
+ const Uint32 fileOffset = fsRWReq->data.listOfPair[i].fileOffset;
+ if (varIndex >= tNRR) {
+ jam();
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }//if
+ request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
+ request->par.readWrite.pages[i].size = tPageSize;
+ request->par.readWrite.pages[i].offset = fileOffset * tPageSize;
+ }//for
+ request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
+ break;
+ }//case
+
+ // Range of memory page with one file page
+ case FsReadWriteReq::fsFormatArrayOfPages: {
+ if ((fsRWReq->numberOfPages + fsRWReq->data.arrayOfPages.varIndex) > tNRR) {
+ jam();
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }//if
+ const Uint32 varIndex = fsRWReq->data.arrayOfPages.varIndex;
+ const Uint32 fileOffset = fsRWReq->data.arrayOfPages.fileOffset;
+
+ request->par.readWrite.pages[0].offset = fileOffset * tPageSize;
+ request->par.readWrite.pages[0].size = tPageSize * fsRWReq->numberOfPages;
+ request->par.readWrite.numberOfPages = 1;
+ request->par.readWrite.pages[0].buf = &tWA[varIndex * tPageSize];
+ break;
+ }//case
+
+ // List of memory pages followed by one file page
+ case FsReadWriteReq::fsFormatListOfMemPages: {
+
+ tPageOffset = fsRWReq->data.listOfMemPages.varIndex[fsRWReq->numberOfPages];
+ tPageOffset *= tPageSize;
+
+ for (unsigned int i = 0; i < fsRWReq->numberOfPages; i++) {
+ jam();
+ Uint32 varIndex = fsRWReq->data.listOfMemPages.varIndex[i];
+
+ if (varIndex >= tNRR) {
+ jam();
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }//if
+ request->par.readWrite.pages[i].buf = &tWA[varIndex * tClusterSize];
+ request->par.readWrite.pages[i].size = tPageSize;
+ request->par.readWrite.pages[i].offset = tPageOffset + (i*tPageSize);
+ }//for
+ request->par.readWrite.numberOfPages = fsRWReq->numberOfPages;
+ break;
+ // make it a writev or readv
+ }//case
+
+ default: {
+ jam();
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }//default
+
+ }//switch
+
+ ndbrequire(forward(openFile, request));
+ return;
+
+error:
+ theRequestPool->put(request);
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, errorCode);
+ fsRef->osErrorCode = ~0; // Indicate local error
+ switch (action) {
+ case Request:: write:
+ case Request:: writeSync: {
+ jam();
+ sendSignal(userRef, GSN_FSWRITEREF, signal, 3, JBB);
+ break;
+ }//case
+ case Request:: read: {
+ jam();
+ sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
+ }//case
+ }//switch
+ return;
+}
+
+/*
+ PR0: File Pointer , theData[0]
+ DR0: User reference, theData[1]
+ DR1: User Pointer, etc.
+ DR2: Flag
+ DR3: Var number
+ DR4: amount of pages
+ DR5->: Memory Page id and File page id according to Flag
+*/
+void
+Ndbfs::execFSWRITEREQ(Signal* signal)
+{
+ jamEntry();
+ const FsReadWriteReq * const fsWriteReq = (FsReadWriteReq *)&signal->theData[0];
+
+ if (fsWriteReq->getSyncFlag(fsWriteReq->operationFlag) == true){
+ jam();
+ readWriteRequest( Request::writeSync, signal );
+ } else {
+ jam();
+ readWriteRequest( Request::write, signal );
+ }
+}
+
+/*
+ PR0: File Pointer
+ DR0: User reference
+ DR1: User Pointer
+ DR2: Flag
+ DR3: Var number
+ DR4: amount of pages
+ DR5->: Memory Page id and File page id according to Flag
+*/
+void
+Ndbfs::execFSREADREQ(Signal* signal)
+{
+ jamEntry();
+ readWriteRequest( Request::read, signal );
+}
+
+/*
+ * PR0: File Pointer DR0: User reference DR1: User Pointer
+ */
+void
+Ndbfs::execFSSYNCREQ(Signal * signal)
+{
+ jamEntry();
+ Uint16 filePointer = (Uint16)signal->theData[0];
+ BlockReference userRef = signal->theData[1];
+ const UintR userPointer = signal->theData[2];
+ AsyncFile* openFile = theOpenFiles.find(filePointer);
+
+ if (openFile == NULL) {
+ jam(); //file not open
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrFileDoesNotExist);
+ fsRef->osErrorCode = ~0; // Indicate local error
+ sendSignal(userRef, GSN_FSSYNCREF, signal, 3, JBB);
+ return;
+ }
+
+ Request *request = theRequestPool->get();
+ request->error = 0;
+ request->action = Request::sync;
+ request->set(userRef, userPointer, filePointer);
+ request->file = openFile;
+ request->theTrace = signal->getTrace();
+
+ ndbrequire(forward(openFile,request));
+}
+
+void
+Ndbfs::execFSAPPENDREQ(Signal * signal)
+{
+ const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
+ const Uint16 filePointer = (Uint16)fsReq->filePointer;
+ const UintR userPointer = fsReq->userPointer;
+ const BlockReference userRef = fsReq->userReference;
+ const BlockNumber blockNumber = refToBlock(userRef);
+
+ FsRef::NdbfsErrorCodeType errorCode;
+
+ AsyncFile* openFile = theOpenFiles.find(filePointer);
+ const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
+
+ const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
+ const Uint32 tSz = myBaseAddrRef->nrr;
+ const Uint32 offset = fsReq->offset;
+ const Uint32 size = fsReq->size;
+ Request *request = theRequestPool->get();
+
+ if (openFile == NULL) {
+ jam();
+ errorCode = FsRef::fsErrFileDoesNotExist;
+ goto error;
+ }
+
+ if (myBaseAddrRef == NULL) {
+ jam(); // Ensure that a valid variable is used
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+
+ if (fsReq->varIndex >= getBatSize(blockNumber)) {
+ jam();// Ensure that a valid variable is used
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+
+ if(offset + size > tSz){
+ jam(); // Ensure that a valid variable is used
+ errorCode = FsRef::fsErrInvalidParameters;
+ goto error;
+ }
+
+ request->error = 0;
+ request->set(userRef, userPointer, filePointer);
+ request->file = openFile;
+ request->action = Request::append;
+ request->theTrace = signal->getTrace();
+
+ request->par.append.buf = (const char *)(tWA + offset);
+ request->par.append.size = size << 2;
+
+ ndbrequire(forward(openFile, request));
+ return;
+
+error:
+ jam();
+ theRequestPool->put(request);
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->setErrorCode(fsRef->errorCode, errorCode);
+ fsRef->osErrorCode = ~0; // Indicate local error
+
+ jam();
+ sendSignal(userRef, GSN_FSAPPENDREF, signal, 3, JBB);
+ return;
+}
+
+Uint16
+Ndbfs::newId()
+{
+ // finds a new key, eg a new filepointer
+ for (int i = 1; i < SHRT_MAX; i++)
+ {
+ if (theLastId == SHRT_MAX) {
+ jam();
+ theLastId = 1;
+ } else {
+ jam();
+ theLastId++;
+ }
+
+ if(theOpenFiles.find(theLastId) == NULL) {
+ jam();
+ return theLastId;
+ }
+ }
+ ndbrequire(1 == 0);
+ // The program will not reach this point
+ return 0;
+}
+
+AsyncFile*
+Ndbfs::createAsyncFile(){
+
+ // Check limit of open files
+ if (theFiles.size()+1 == m_maxFiles) {
+ // Print info about all open files
+ for (unsigned i = 0; i < theFiles.size(); i++){
+ AsyncFile* file = theFiles[i];
+ ndbout_c("%2d (0x%x): %s", i, file, file->isOpen()?"OPEN":"CLOSED");
+ }
+ ERROR_SET(fatal, AFS_ERROR_MAXOPEN,""," Ndbfs::createAsyncFile");
+ }
+
+ AsyncFile* file = new AsyncFile;
+ file->doStart(theFileSystemPath);
+
+ // Put the file in list of all files
+ theFiles.push_back(file);
+
+#ifdef VM_TRACE
+ infoEvent("NDBFS: Created new file thread %d", theFiles.size());
+#endif
+
+ return file;
+}
+
+AsyncFile*
+Ndbfs::getIdleFile(){
+ AsyncFile* file;
+ if (theIdleFiles.size() > 0){
+ file = theIdleFiles[0];
+ theIdleFiles.erase(0);
+ } else {
+ file = createAsyncFile();
+ }
+ return file;
+}
+
+
+
+void
+Ndbfs::report(Request * request, Signal* signal)
+{
+ const Uint32 orgTrace = signal->getTrace();
+ signal->setTrace(request->theTrace);
+ const BlockReference ref = request->theUserReference;
+ if (request->error) {
+ jam();
+ // Initialise FsRef signal
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = request->theUserPointer;
+ fsRef->setErrorCode(fsRef->errorCode, translateErrno(request->error));
+ fsRef->osErrorCode = request->error;
+
+ switch (request->action) {
+ case Request:: open: {
+ jam();
+ // Put the file back in idle files list
+ theIdleFiles.push_back(request->file);
+ sendSignal(ref, GSN_FSOPENREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request:: closeRemove:
+ case Request:: close: {
+ jam();
+ sendSignal(ref, GSN_FSCLOSEREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request:: writeSync:
+ case Request:: writevSync:
+ case Request:: write:
+ case Request:: writev: {
+ jam();
+ sendSignal(ref, GSN_FSWRITEREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request:: read:
+ case Request:: readv: {
+ jam();
+ sendSignal(ref, GSN_FSREADREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request:: sync: {
+ jam();
+ sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request::append: {
+ jam();
+ sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+ case Request::rmrf: {
+ jam();
+ // Put the file back in idle files list
+ theIdleFiles.push_back(request->file);
+ sendSignal(ref, GSN_FSREMOVEREF, signal, FsRef::SignalLength, JBB);
+ break;
+ }
+
+ case Request:: end: {
+ // Report nothing
+ break;
+ }
+ }//switch
+ } else {
+ jam();
+ FsConf * const fsConf = (FsConf *)&signal->theData[0];
+ fsConf->userPointer = request->theUserPointer;
+ switch (request->action) {
+ case Request:: open: {
+ jam();
+ theOpenFiles.insert(request->file, request->theFilePointer);
+
+ // Keep track on max number of opened files
+ if (theOpenFiles.size() > m_maxOpenedFiles)
+ m_maxOpenedFiles = theOpenFiles.size();
+
+ fsConf->filePointer = request->theFilePointer;
+ sendSignal(ref, GSN_FSOPENCONF, signal, 3, JBB);
+ break;
+ }
+ case Request:: closeRemove:
+ case Request:: close: {
+ jam();
+ // removes the file from OpenFiles list
+ theOpenFiles.erase(request->theFilePointer);
+ // Put the file in idle files list
+ theIdleFiles.push_back(request->file);
+ sendSignal(ref, GSN_FSCLOSECONF, signal, 1, JBB);
+ break;
+ }
+ case Request:: writeSync:
+ case Request:: writevSync:
+ case Request:: write:
+ case Request:: writev: {
+ jam();
+ sendSignal(ref, GSN_FSWRITECONF, signal, 1, JBB);
+ break;
+ }
+ case Request:: read:
+ case Request:: readv: {
+ jam();
+ sendSignal(ref, GSN_FSREADCONF, signal, 1, JBB);
+ break;
+ }
+ case Request:: sync: {
+ jam();
+ sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
+ break;
+ }//case
+ case Request::append: {
+ jam();
+ signal->theData[1] = request->par.append.size;
+ sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
+ break;
+ }
+ case Request::rmrf: {
+ jam();
+ // Put the file in idle files list
+ theIdleFiles.push_back(request->file);
+ sendSignal(ref, GSN_FSREMOVECONF, signal, 1, JBB);
+ break;
+ }
+ case Request:: end: {
+ // Report nothing
+ break;
+ }
+ }
+ }//if
+ signal->setTrace(orgTrace);
+}
+
+
+bool
+Ndbfs::scanIPC(Signal* signal)
+{
+ Request* request = theFromThreads.tryReadChannel();
+ jam();
+ if (request) {
+ jam();
+ report(request, signal);
+ theRequestPool->put(request);
+ return &request;
+ }
+ return false;
+}
+
+#if defined NDB_WIN32
+int Ndbfs::translateErrno(int aErrno)
+{
+ switch (aErrno)
+ {
+ //permission denied
+ case ERROR_ACCESS_DENIED:
+
+ return FsRef::fsErrPermissionDenied;
+ //temporary not accessible
+ case ERROR_PATH_BUSY:
+ case ERROR_NO_MORE_SEARCH_HANDLES:
+
+ return FsRef::fsErrTemporaryNotAccessible;
+ //no space left on device
+ case ERROR_HANDLE_DISK_FULL:
+ case ERROR_DISK_FULL:
+
+ return FsRef::fsErrNoSpaceLeftOnDevice;
+ //none valid parameters
+ case ERROR_INVALID_HANDLE:
+ case ERROR_INVALID_DRIVE:
+ case ERROR_INVALID_ACCESS:
+ case ERROR_HANDLE_EOF:
+ case ERROR_BUFFER_OVERFLOW:
+
+ return FsRef::fsErrInvalidParameters;
+ //environment error
+ case ERROR_CRC:
+ case ERROR_ARENA_TRASHED:
+ case ERROR_BAD_ENVIRONMENT:
+ case ERROR_INVALID_BLOCK:
+ case ERROR_WRITE_FAULT:
+ case ERROR_READ_FAULT:
+ case ERROR_OPEN_FAILED:
+
+ return FsRef::fsErrEnvironmentError;
+
+ //no more process resources
+ case ERROR_TOO_MANY_OPEN_FILES:
+ case ERROR_NOT_ENOUGH_MEMORY:
+ case ERROR_OUTOFMEMORY:
+ return FsRef::fsErrNoMoreResources;
+ //no file
+ case ERROR_FILE_NOT_FOUND:
+ return FsRef::fsErrFileDoesNotExist;
+
+ case ERR_ReadUnderflow:
+ return FsRef::fsErrReadUnderflow;
+
+ default:
+ return FsRef::fsErrUnknown;
+ }
+}
+#elif defined NDB_OSE || defined NDB_SOFTOSE
+int Ndbfs::translateErrno(int aErrno)
+{
+ switch (aErrno)
+ {
+ //permission denied
+ case EACCES:
+ case EROFS:
+ case ENXIO:
+ return FsRef::fsErrPermissionDenied;
+ //temporary not accessible
+ case EAGAIN:
+ case ETIMEDOUT:
+ case ENOLCK:
+ return FsRef::fsErrTemporaryNotAccessible;
+ //no space left on device
+ case ENFILE:
+ case EDQUOT:
+ case ENOSPC:
+ return FsRef::fsErrNoSpaceLeftOnDevice;
+ //none valid parameters
+ case EINVAL:
+ case EFBIG:
+ case EBADF:
+ case ENAMETOOLONG:
+ case EFAULT:
+ case EISDIR:
+ return FsRef::fsErrInvalidParameters;
+ //environment error
+ case EMLINK:
+ case ELOOP:
+ return FsRef::fsErrEnvironmentError;
+
+ //no more process resources
+ case EMFILE:
+ case ENOMEM:
+ return FsRef::fsErrNoMoreResources;
+ //no file
+ case ENOENT:
+ return FsRef::fsErrFileDoesNotExist;
+
+ case ERR_ReadUnderflow:
+ return FsRef::fsErrReadUnderflow;
+
+ default:
+ return FsRef::fsErrUnknown;
+ }
+}
+#else
+int Ndbfs::translateErrno(int aErrno)
+{
+ switch (aErrno)
+ {
+ //permission denied
+ case EACCES:
+ case EROFS:
+ case ENXIO:
+ return FsRef::fsErrPermissionDenied;
+ //temporary not accessible
+ case EAGAIN:
+ case ETIMEDOUT:
+ case ENOLCK:
+ case EINTR:
+ case EIO:
+ return FsRef::fsErrTemporaryNotAccessible;
+ //no space left on device
+ case ENFILE:
+ case EDQUOT:
+#ifndef NDB_MACOSX
+ case ENOSR:
+#endif
+ case ENOSPC:
+ case EFBIG:
+ return FsRef::fsErrNoSpaceLeftOnDevice;
+ //none valid parameters
+ case EINVAL:
+ case EBADF:
+ case ENAMETOOLONG:
+ case EFAULT:
+ case EISDIR:
+ case ENOTDIR:
+ case EEXIST:
+ case ETXTBSY:
+ return FsRef::fsErrInvalidParameters;
+ //environment error
+ case ELOOP:
+#ifndef NDB_MACOSX
+ case ENOLINK:
+ case EMULTIHOP:
+#endif
+#ifndef NDB_LINUX
+ case EOPNOTSUPP:
+ case ESPIPE:
+#endif
+ case EPIPE:
+ return FsRef::fsErrEnvironmentError;
+
+ //no more process resources
+ case EMFILE:
+ case ENOMEM:
+ return FsRef::fsErrNoMoreResources;
+ //no file
+ case ENOENT:
+ return FsRef::fsErrFileDoesNotExist;
+
+ case ERR_ReadUnderflow:
+ return FsRef::fsErrReadUnderflow;
+
+ default:
+ return FsRef::fsErrUnknown;
+ }
+}
+#endif
+
+
+
+void
+Ndbfs::execCONTINUEB(Signal* signal)
+{
+ jamEntry();
+ if (signal->theData[0] == NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY) {
+ jam();
+
+ // Also send CONTINUEB to ourself in order to scan for
+ // incoming answers from AsyncFile on MemoryChannel theFromThreads
+ signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_10MS_DELAY;
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 1);
+ if (scanningInProgress == true) {
+ jam();
+ return;
+ }
+ }
+ if (scanIPC(signal)) {
+ jam();
+ scanningInProgress = true;
+ signal->theData[0] = NdbfsContinueB::ZSCAN_MEMORYCHANNEL_NO_DELAY;
+ sendSignal(reference(), GSN_CONTINUEB, signal, 1, JBB);
+ } else {
+ jam();
+ scanningInProgress = false;
+ }
+ return;
+}
+
+bool Global_useO_SYNC = false;
+bool Global_useO_DIRECT = false;
+bool Global_unlinkO_CREAT = false;
+Uint32 Global_syncFreq = 1024 * 1024;
+
+void
+Ndbfs::execDUMP_STATE_ORD(Signal* signal)
+{
+ if(signal->theData[0] == 19){
+ if(signal->length() > 1){
+ Global_useO_SYNC = signal->theData[1];
+ }
+ if(signal->length() > 2){
+ Global_syncFreq = signal->theData[2] * 1024 * 1024;
+ }
+ if(signal->length() > 3){
+ Global_unlinkO_CREAT = signal->theData[3];
+ }
+ if(signal->length() > 4){
+ Global_useO_DIRECT = signal->theData[4];
+ }
+ ndbout_c("useO_SYNC = %d syncFreq = %d unlinkO_CREATE = %d O_DIRECT = %d",
+ Global_useO_SYNC,
+ Global_syncFreq,
+ Global_unlinkO_CREAT,
+ Global_useO_DIRECT);
+ return;
+ }
+ if(signal->theData[0] == DumpStateOrd::NdbfsDumpFileStat){
+ infoEvent("NDBFS: Files: %d Open files: %d",
+ theFiles.size(),
+ theOpenFiles.size());
+ infoEvent(" Idle files: %d Max opened files: %d",
+ theIdleFiles.size(),
+ m_maxOpenedFiles);
+ infoEvent(" Max files: %d",
+ m_maxFiles);
+ infoEvent(" Requests: %d",
+ theRequestPool->size());
+
+ return;
+ }
+ if(signal->theData[0] == DumpStateOrd::NdbfsDumpOpenFiles){
+ infoEvent("NDBFS: Dump open files: %d", theOpenFiles.size());
+
+ for (unsigned i = 0; i < theOpenFiles.size(); i++){
+ AsyncFile* file = theOpenFiles.getFile(i);
+ infoEvent("%2d (0x%x): %s", i,file, file->theFileName.c_str());
+ }
+ return;
+ }
+ if(signal->theData[0] == DumpStateOrd::NdbfsDumpAllFiles){
+ infoEvent("NDBFS: Dump all files: %d", theFiles.size());
+
+ for (unsigned i = 0; i < theFiles.size(); i++){
+ AsyncFile* file = theFiles[i];
+ infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
+ }
+ return;
+ }
+ if(signal->theData[0] == DumpStateOrd::NdbfsDumpIdleFiles){
+ infoEvent("NDBFS: Dump idle files: %d", theIdleFiles.size());
+
+ for (unsigned i = 0; i < theIdleFiles.size(); i++){
+ AsyncFile* file = theIdleFiles[i];
+ infoEvent("%2d (0x%x): %s", i,file, file->isOpen()?"OPEN":"CLOSED");
+ }
+ return;
+ }
+}//Ndbfs::execDUMP_STATE_ORD()
+
+
+
+BLOCK_FUNCTIONS(Ndbfs);
+
diff --git a/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp b/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
new file mode 100644
index 00000000000..080196a9ea5
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Ndbfs.hpp
@@ -0,0 +1,126 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef SIMBLOCKASYNCFILESYSTEM_H
+#define SIMBLOCKASYNCFILESYSTEM_H
+
+#include <pc.hpp>
+#include <SimulatedBlock.hpp>
+#include "Pool.hpp"
+#include "AsyncFile.hpp"
+#include "OpenFiles.hpp"
+
+
+
+// Because one NDB Signal request can result in multiple requests to
+// AsyncFile one class must be made responsible to keep track
+// of all out standing request and when all are finished the result
+// must be reported to the sending block.
+
+
+class Ndbfs : public SimulatedBlock
+{
+public:
+ Ndbfs(const class Configuration & conf);
+ virtual ~Ndbfs();
+
+protected:
+ BLOCK_DEFINES(Ndbfs);
+
+ // The signal processing functions
+ void execDUMP_STATE_ORD(Signal* signal);
+ void execFSOPENREQ(Signal* signal);
+ void execFSCLOSEREQ(Signal* signal);
+ void execFSWRITEREQ(Signal* signal);
+ void execFSREADREQ(Signal* signal);
+ void execFSSYNCREQ(Signal* signal);
+ void execFSAPPENDREQ(Signal* signal);
+ void execFSREMOVEREQ(Signal* signal);
+ void execSTTOR(Signal* signal);
+ void execCONTINUEB(Signal* signal);
+
+ bool scanningInProgress;
+ Uint16 newId();
+
+private:
+ int forward(AsyncFile *file, Request* Request);
+ void report(Request* request, Signal* signal);
+ bool scanIPC(Signal* signal);
+
+ // Declared but not defined
+ Ndbfs(Ndbfs & );
+ void operator = (Ndbfs &);
+
+ // Used for uniqe number generation
+ Uint16 theLastId;
+ BlockReference cownref;
+
+ // Communication from files
+ MemoryChannel<Request> theFromThreads;
+
+ Pool<Request>* theRequestPool;
+
+ AsyncFile* createAsyncFile();
+ AsyncFile* getIdleFile();
+
+ Vector<AsyncFile*> theFiles; // List all created AsyncFiles
+ Vector<AsyncFile*> theIdleFiles; // List of idle AsyncFiles
+ OpenFiles theOpenFiles; // List of open AsyncFiles
+ const char * theFileSystemPath;
+
+ // Statistics variables
+ Uint32 m_maxOpenedFiles;
+
+ // Limit for max number of AsyncFiles created
+ Uint32 m_maxFiles;
+
+ void readWriteRequest( int action, Signal * signal );
+
+ static int translateErrno(int aErrno);
+};
+
+class VoidFs : public SimulatedBlock
+{
+public:
+ VoidFs(const class Configuration & conf);
+ virtual ~VoidFs();
+
+protected:
+ BLOCK_DEFINES(VoidFs);
+
+ // The signal processing functions
+ void execDUMP_STATE_ORD(Signal* signal);
+ void execFSOPENREQ(Signal* signal);
+ void execFSCLOSEREQ(Signal* signal);
+ void execFSWRITEREQ(Signal* signal);
+ void execFSREADREQ(Signal* signal);
+ void execFSSYNCREQ(Signal* signal);
+ void execFSAPPENDREQ(Signal* signal);
+ void execFSREMOVEREQ(Signal* signal);
+ void execSTTOR(Signal* signal);
+
+private:
+ // Declared but not defined
+ VoidFs(VoidFs & );
+ void operator = (VoidFs &);
+
+ // Used for uniqe number generation
+ Uint32 c_maxFileNo;
+};
+
+#endif
+
+
diff --git a/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp b/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp
new file mode 100644
index 00000000000..b944bb5485b
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/OpenFiles.hpp
@@ -0,0 +1,114 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef OPENFILES_H
+#define OPENFILES_H
+
+#include <Vector.hpp>
+
+class OpenFiles
+{
+public:
+ OpenFiles(){ }
+
+ /* Get a pointer to the file with id */
+ AsyncFile* find(Uint16 id);
+ /* Insert file with id */
+ bool insert(AsyncFile* file, Uint16 id);
+ /* Erase file with id */
+ bool erase(Uint16 id);
+ /* Get number of open files */
+ unsigned size();
+
+ Uint16 getId(unsigned i);
+ AsyncFile* getFile(unsigned i);
+
+
+private:
+
+ class OpenFileItem {
+ public:
+ OpenFileItem(): m_file(NULL), m_id(0){};
+
+ AsyncFile* m_file;
+ Uint16 m_id;
+ };
+
+ Vector<OpenFileItem> m_files;
+};
+
+
+//*****************************************************************************
+inline AsyncFile* OpenFiles::find(Uint16 id){
+ for (unsigned i = 0; i < m_files.size(); i++){
+ if (m_files[i].m_id == id){
+ return m_files[i].m_file;
+ }
+ }
+ return NULL;
+}
+
+//*****************************************************************************
+inline bool OpenFiles::erase(Uint16 id){
+ for (unsigned i = 0; i < m_files.size(); i++){
+ if (m_files[i].m_id == id){
+ m_files.erase(i);
+ return true;
+ }
+ }
+ // Item was not found in list
+ return false;
+}
+
+
+//*****************************************************************************
+inline bool OpenFiles::insert(AsyncFile* file, Uint16 id){
+ // Check if file has already been opened
+ for (unsigned i = 0; i < m_files.size(); i++){
+ if(m_files[i].m_file == NULL)
+ continue;
+
+ if(strcmp(m_files[i].m_file->theFileName.c_str(),
+ file->theFileName.c_str()) == 0){
+ ERROR_SET(fatal, AFS_ERROR_ALLREADY_OPEN,"","OpenFiles::insert()");
+ }
+ }
+
+ // Insert the file into vector
+ OpenFileItem openFile;
+ openFile.m_id = id;
+ openFile.m_file = file;
+ m_files.push_back(openFile);
+
+ return true;
+}
+
+//*****************************************************************************
+inline Uint16 OpenFiles::getId(unsigned i){
+ return m_files[i].m_id;
+}
+
+//*****************************************************************************
+inline AsyncFile* OpenFiles::getFile(unsigned i){
+ return m_files[i].m_file;
+}
+
+//*****************************************************************************
+inline unsigned OpenFiles::size(){
+ return m_files.size();
+}
+
+#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/Pool.hpp b/ndb/src/kernel/blocks/ndbfs/Pool.hpp
new file mode 100644
index 00000000000..a26fa730727
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/Pool.hpp
@@ -0,0 +1,262 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef FOR_LIB_POOL_H
+#define FOR_LIB_POOL_H
+
+
+//===========================================================================
+//
+// .PUBLIC
+//
+//===========================================================================
+
+////////////////////////////////////////////////////////////////
+//
+// enum { defInitSize = 256, defIncSize = 64 };
+// Description: type to store initial and incremental size in.
+//
+////////////////////////////////////////////////////////////////
+//
+// Pool(int anInitSize = defInitSize, int anIncSize = defIncSize);
+// Description:
+// Constructor. Allocates anInitSize of objects <template argument>.
+// When the pool runs out of elements, anIncSize elements are added to the
+// pool. (When the pool is not optimized to allocate multiple elements
+// more efficient, the anIncSize MUST be set to 1 to get the best
+// performance...
+//
+// Parameters:
+// defInitSize: Initial size of the pool (# of elements in the pool)
+// defIncSize: # of elements added to the pool when a request to an empty
+// pool is made.
+// Return value:
+// _
+// Errors:
+// -
+// Asserts:
+// _
+//
+////////////////////////////////////////////////////////////////
+//
+// virtual ~Pool();
+// Description:
+// Elements in the pool are all deallocated.
+// Parameters:
+// _
+// Return value:
+// _
+// Errors:
+// -
+// Asserts:
+// theEmptyNodeList==0. No elements are in still in use.
+//
+////////////////////////////////////////////////////////////////
+//
+// T& get();
+// Description:
+// get's an element from the Pool.
+// Parameters:
+// _
+// Return value:
+// T& the element extracted from the Pool. (element must be cleared to
+// mimick newly created element)
+// Errors:
+// -
+// Asserts:
+// _
+//
+////////////////////////////////////////////////////////////////
+//
+// void put(T& aT);
+// Description:
+// Returns an element to the pool.
+// Parameters:
+// aT The element to put back in the pool
+// Return value:
+// void
+// Errors:
+// -
+// Asserts:
+// The pool has "empty" elements, to put element back in...
+//
+//===========================================================================
+//
+// .PRIVATE
+//
+//===========================================================================
+
+////////////////////////////////////////////////////////////////
+//
+// void allocate(int aSize);
+// Description:
+// add aSize elements to the pool
+// Parameters:
+// aSize: # of elements to add to the pool
+// Return value:
+// void
+// Errors:
+// -
+// Asserts:
+// _
+//
+////////////////////////////////////////////////////////////////
+//
+// void deallocate();
+// Description:
+// frees all elements kept in the pool.
+// Parameters:
+// _
+// Return value:
+// void
+// Errors:
+// -
+// Asserts:
+// No elements are "empty" i.e. in use.
+//
+//===========================================================================
+//
+// .PRIVATE
+//
+//===========================================================================
+
+////////////////////////////////////////////////////////////////
+//
+// Pool<T>& operator=(const Pool<T>& cp);
+// Description:
+// Prohibit use of assignement operator.
+// Parameters:
+// cp
+// Return value:
+// Pool<T>&
+// Asserts:
+// _
+//
+////////////////////////////////////////////////////////////////
+//
+// Pool(const Pool<T>& cp);
+// Description:
+// Prohibit use of default copy constructor.
+// Parameters:
+// cp
+// Return value:
+// _
+// Errors:
+// -
+// Asserts:
+// _
+//
+////////////////////////////////////////////////////////////////
+//
+// int initSize;
+// Description: size of the initial size of the pool
+//
+////////////////////////////////////////////////////////////////
+//
+// int incSize;
+// Description: # of elements added to the pool when pool is exhausted.
+//
+////////////////////////////////////////////////////////////////
+//
+// PoolElement<T>* theFullNodeList;
+// Description: List to contain all "unused" elements in the pool
+//
+////////////////////////////////////////////////////////////////
+//
+// PoolElement<T>* theEmptyNodeList;
+// Description: List to contain all "in use" elements in the pool
+//
+//-------------------------------------------------------------------------
+
+template <class T>
+class Pool
+{
+public:
+ enum { defInitSize = 256, defIncSize = 64 };
+
+ Pool(int anInitSize = defInitSize, int anIncSize = defIncSize) :
+ theIncSize(anIncSize),
+ theTop(0),
+ theCurrentSize(0),
+ theList(0)
+ {
+ allocate(anInitSize);
+ }
+
+ virtual ~Pool(void)
+ {
+ for (int i=0; i <theTop ; ++i)
+ delete theList[i];
+
+ delete []theList;
+ }
+
+ T* get();
+ void put(T* aT);
+
+ unsigned size(){ return theTop; };
+
+protected:
+ void allocate(int aSize)
+ {
+ T** tList = theList;
+ int i;
+ theList = new T*[aSize+theCurrentSize];
+ REQUIRE(theList != 0, "Allocate in Pool.hpp failed");
+ // allocate full list
+ for (i = 0; i < theTop; i++) {
+ theList[i] = tList[i];
+ }
+ delete []tList;
+ for (; (theTop < aSize); theTop++){
+ theList[theTop] = (T*)new T;
+ }
+ theCurrentSize += aSize;
+ }
+
+private:
+ Pool<T>& operator=(const Pool<T>& cp);
+ Pool(const Pool<T>& cp);
+
+ int theIncSize;
+ int theTop;
+ int theCurrentSize;
+
+ T** theList;
+};
+
+//******************************************************************************
+template <class T> inline T* Pool<T>::get()
+{
+ T* tmp;
+ if( theTop == 0 )
+ {
+ allocate(theIncSize);
+ }
+ --theTop;
+ tmp = theList[theTop];
+ return tmp;
+}
+
+//
+//******************************************************************************
+template <class T> inline void Pool<T>::put(T* aT)
+{
+ theList[theTop]= aT;
+ ++theTop;
+}
+
+#endif
diff --git a/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp b/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp
new file mode 100644
index 00000000000..d3407e8d4e7
--- /dev/null
+++ b/ndb/src/kernel/blocks/ndbfs/VoidFs.cpp
@@ -0,0 +1,200 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <limits.h>
+#include <errno.h>
+
+#include "Ndbfs.hpp"
+#include "AsyncFile.hpp"
+#include "Filename.hpp"
+#include "Error.hpp"
+
+#include <signaldata/FsOpenReq.hpp>
+#include <signaldata/FsCloseReq.hpp>
+#include <signaldata/FsReadWriteReq.hpp>
+#include <signaldata/FsAppendReq.hpp>
+#include <signaldata/FsRemoveReq.hpp>
+#include <signaldata/FsConf.hpp>
+#include <signaldata/FsRef.hpp>
+#include <signaldata/NdbfsContinueB.hpp>
+#include <signaldata/DumpStateOrd.hpp>
+
+#include <RefConvert.hpp>
+#include <NdbSleep.h>
+#include <NdbOut.hpp>
+#include <Configuration.hpp>
+
+#define DEBUG(x) { ndbout << "FS::" << x << endl; }
+
+VoidFs::VoidFs(const Configuration & conf) :
+ SimulatedBlock(NDBFS, conf)
+{
+ BLOCK_CONSTRUCTOR(VoidFs);
+
+ // Set received signals
+ addRecSignal(GSN_DUMP_STATE_ORD, &VoidFs::execDUMP_STATE_ORD);
+ addRecSignal(GSN_STTOR, &VoidFs::execSTTOR);
+ addRecSignal(GSN_FSOPENREQ, &VoidFs::execFSOPENREQ);
+ addRecSignal(GSN_FSCLOSEREQ, &VoidFs::execFSCLOSEREQ);
+ addRecSignal(GSN_FSWRITEREQ, &VoidFs::execFSWRITEREQ);
+ addRecSignal(GSN_FSREADREQ, &VoidFs::execFSREADREQ);
+ addRecSignal(GSN_FSSYNCREQ, &VoidFs::execFSSYNCREQ);
+ addRecSignal(GSN_FSAPPENDREQ, &VoidFs::execFSAPPENDREQ);
+ addRecSignal(GSN_FSREMOVEREQ, &VoidFs::execFSREMOVEREQ);
+ // Set send signals
+}
+
+VoidFs::~VoidFs()
+{
+}
+
+void
+VoidFs::execSTTOR(Signal* signal)
+{
+ jamEntry();
+
+ if(signal->theData[1] == 0){ // StartPhase 0
+ jam();
+ signal->theData[3] = 255;
+ sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 4, JBB);
+ return;
+ }
+ ndbrequire(0);
+}
+
+void
+VoidFs::execFSOPENREQ(Signal* signal)
+{
+ jamEntry();
+ const FsOpenReq * const fsOpenReq = (FsOpenReq *)&signal->theData[0];
+ const BlockReference userRef = fsOpenReq->userReference;
+ const Uint32 userPointer = fsOpenReq->userPointer;
+
+ Uint32 flags = fsOpenReq->fileFlags;
+ if(flags == FsOpenReq::OM_READONLY){
+ // Initialise FsRef signal
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->errorCode = FsRef::fsErrFileDoesNotExist;
+ fsRef->osErrorCode = ~0;
+ sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
+ return;
+ }
+
+ if(flags & FsOpenReq::OM_WRITEONLY || flags & FsOpenReq::OM_READWRITE){
+ signal->theData[0] = userPointer;
+ signal->theData[1] = c_maxFileNo++;
+ sendSignal(userRef, GSN_FSOPENCONF, signal, 2, JBB);
+ }
+}
+
+void
+VoidFs::execFSREMOVEREQ(Signal* signal)
+{
+ jamEntry();
+ const FsRemoveReq * const req = (FsRemoveReq *)signal->getDataPtr();
+ const Uint32 userRef = req->userReference;
+ const Uint32 userPointer = req->userPointer;
+
+ signal->theData[0] = userPointer;
+ sendSignal(userRef, GSN_FSREMOVECONF, signal, 1, JBB);
+}
+
+/*
+ * PR0: File Pointer DR0: User reference DR1: User Pointer DR2: Flag bit 0= 1
+ * remove file
+ */
+void
+VoidFs::execFSCLOSEREQ(Signal * signal)
+{
+ jamEntry();
+
+ const FsCloseReq * const req = (FsCloseReq *)signal->getDataPtr();
+ const Uint32 userRef = req->userReference;
+ const Uint32 userPointer = req->userPointer;
+
+ signal->theData[0] = userPointer;
+ sendSignal(userRef, GSN_FSCLOSECONF, signal, 1, JBB);
+}
+
+void
+VoidFs::execFSWRITEREQ(Signal* signal)
+{
+ jamEntry();
+ const FsReadWriteReq * const req = (FsReadWriteReq *)signal->getDataPtr();
+ const Uint32 userRef = req->userReference;
+ const Uint32 userPointer = req->userPointer;
+
+ signal->theData[0] = userPointer;
+ sendSignal(userRef, GSN_FSWRITECONF, signal, 1, JBB);
+}
+
+void
+VoidFs::execFSREADREQ(Signal* signal)
+{
+ jamEntry();
+
+ const FsReadWriteReq * const req = (FsReadWriteReq *)signal->getDataPtr();
+ const Uint32 userRef = req->userReference;
+ const Uint32 userPointer = req->userPointer;
+
+ signal->theData[0] = userPointer;
+ sendSignal(userRef, GSN_FSREADCONF, signal, 1, JBB);
+#if 0
+ FsRef * const fsRef = (FsRef *)&signal->theData[0];
+ fsRef->userPointer = userPointer;
+ fsRef->errorCode = FsRef::fsErrEnvironmentError;
+ fsRef->osErrorCode = ~0; // Indicate local error
+ sendSignal(userRef, GSN_FSREADREF, signal, 3, JBB);
+#endif
+}
+
+void
+VoidFs::execFSSYNCREQ(Signal * signal)
+{
+ jamEntry();
+
+ BlockReference userRef = signal->theData[1];
+ const UintR userPointer = signal->theData[2];
+
+ signal->theData[0] = userPointer;
+ sendSignal(userRef, GSN_FSSYNCCONF, signal, 1, JBB);
+
+ return;
+}
+
+void
+VoidFs::execFSAPPENDREQ(Signal * signal)
+{
+ const FsAppendReq * const fsReq = (FsAppendReq *)&signal->theData[0];
+ const UintR userPointer = fsReq->userPointer;
+ const BlockReference userRef = fsReq->userReference;
+ const Uint32 size = fsReq->size;
+
+ signal->theData[0] = userPointer;
+ signal->theData[1] = size << 2;
+ sendSignal(userRef, GSN_FSAPPENDCONF, signal, 2, JBB);
+}
+
+void
+VoidFs::execDUMP_STATE_ORD(Signal* signal)
+{
+}//VoidFs::execDUMP_STATE_ORD()
+
+
+
+BLOCK_FUNCTIONS(VoidFs);
+