diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbEventOperationImpl.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbEventOperationImpl.cpp | 523 |
1 files changed, 323 insertions, 200 deletions
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp index 87bbca5fc71..208525bfc15 100644 --- a/ndb/src/ndbapi/NdbEventOperationImpl.cpp +++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp @@ -55,14 +55,17 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, const char* eventName, const int bufferLength) : NdbEventOperation(*this), m_ndb(theNdb), - m_state(ERROR), m_bufferL(bufferLength) + m_state(EO_ERROR), m_bufferL(bufferLength) { - m_eventId = 0; - theFirstRecAttrs[0] = NULL; - theCurrentRecAttrs[0] = NULL; - theFirstRecAttrs[1] = NULL; - theCurrentRecAttrs[1] = NULL; + theFirstPkAttrs[0] = NULL; + theCurrentPkAttrs[0] = NULL; + theFirstPkAttrs[1] = NULL; + theCurrentPkAttrs[1] = NULL; + theFirstDataAttrs[0] = NULL; + theCurrentDataAttrs[0] = NULL; + theFirstDataAttrs[1] = NULL; + theCurrentDataAttrs[1] = NULL; sdata = NULL; ptr[0].p = NULL; ptr[1].p = NULL; @@ -71,16 +74,17 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, // we should lookup id in Dictionary, TODO // also make sure we only have one listener on each event - if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; } + if (!m_ndb) abort(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); - if (!myDict) { ndbout_c("getDictionary=NULL"); return; } + if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; } const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); - if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; } + if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; } m_eventImpl = &myEvnt->m_impl; - if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; } + + m_eventId = m_eventImpl->m_eventId; m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); if (m_bufferHandle->m_bufferL > 0) @@ -88,25 +92,30 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, else m_bufferHandle->m_bufferL = m_bufferL; - m_state = CREATED; + m_state = EO_CREATED; } NdbEventOperationImpl::~NdbEventOperationImpl() { int i; - if (sdata) NdbMem_Free(sdata); - for (i=0 ; i<3; i++) { - if (ptr[i].p) NdbMem_Free(ptr[i].p); + if (sdata) NdbMem_Free((char*)sdata); + for (i=0 ; i<2; i++) { + NdbRecAttr *p = theFirstPkAttrs[i]; + while (p) { + NdbRecAttr *p_next = p->next(); + m_ndb->releaseRecAttr(p); + p = p_next; + } } for (i=0 ; i<2; i++) { - NdbRecAttr *p = theFirstRecAttrs[i]; + NdbRecAttr *p = theFirstDataAttrs[i]; while (p) { NdbRecAttr *p_next = p->next(); m_ndb->releaseRecAttr(p); p = p_next; } } - if (m_state == NdbEventOperation::EXECUTING) { + if (m_state == EO_EXECUTING) { stop(); // m_bufferHandle->dropSubscribeEvent(m_bufferId); ; // We should send stop signal here @@ -122,36 +131,50 @@ NdbEventOperationImpl::getState() NdbRecAttr* NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) { - if (m_state != NdbEventOperation::CREATED) { + DBUG_ENTER("NdbEventOperationImpl::getValue"); + if (m_state != EO_CREATED) { ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); - return NULL; + DBUG_RETURN(NULL); } NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName); if (tAttrInfo == NULL) { ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName); - return NULL; + DBUG_RETURN(NULL); } - return NdbEventOperationImpl::getValue(tAttrInfo, aValue, n); + DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n)); } NdbRecAttr* NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n) { + DBUG_ENTER("NdbEventOperationImpl::getValue"); // Insert Attribute Id into ATTRINFO part. - NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n]; - NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n]; - + + NdbRecAttr **theFirstAttr; + NdbRecAttr **theCurrentAttr; + + if (tAttrInfo->getPrimaryKey()) + { + theFirstAttr = &theFirstPkAttrs[n]; + theCurrentAttr = &theCurrentPkAttrs[n]; + } + else + { + theFirstAttr = &theFirstDataAttrs[n]; + theCurrentAttr = &theCurrentDataAttrs[n]; + } + /************************************************************************ * Get a Receive Attribute object and link it into the operation object. ************************************************************************/ - NdbRecAttr *tRecAttr = m_ndb->getRecAttr(); - if (tRecAttr == NULL) { + NdbRecAttr *tAttr = m_ndb->getRecAttr(); + if (tAttr == NULL) { exit(-1); //setErrorCodeAbort(4000); - return NULL; + DBUG_RETURN(NULL); } /********************************************************************** @@ -159,63 +182,65 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in * the RecAttr object * Also set attribute size, array size and attribute type ********************************************************************/ - if (tRecAttr->setup(tAttrInfo, aValue)) { + if (tAttr->setup(tAttrInfo, aValue)) { //setErrorCodeAbort(4000); - m_ndb->releaseRecAttr(tRecAttr); + m_ndb->releaseRecAttr(tAttr); exit(-1); - return NULL; + DBUG_RETURN(NULL); } //theErrorLine++; - tRecAttr->setNULL(); + tAttr->setUNDEFINED(); // We want to keep the list sorted to make data insertion easier later - if (theFirstRecAttr == NULL) { - theFirstRecAttr = tRecAttr; - theCurrentRecAttr = tRecAttr; - tRecAttr->next(NULL); + + if (*theFirstAttr == NULL) { + *theFirstAttr = tAttr; + *theCurrentAttr = tAttr; + tAttr->next(NULL); } else { Uint32 tAttrId = tAttrInfo->m_attrId; - if (tAttrId > theCurrentRecAttr->attrId()) { // right order - theCurrentRecAttr->next(tRecAttr); - tRecAttr->next(NULL); - theCurrentRecAttr = tRecAttr; - } else if (theFirstRecAttr->next() == NULL || // only one in list - theFirstRecAttr->attrId() > tAttrId) {// or first - tRecAttr->next(theFirstRecAttr); - theFirstRecAttr = tRecAttr; + if (tAttrId > (*theCurrentAttr)->attrId()) { // right order + (*theCurrentAttr)->next(tAttr); + tAttr->next(NULL); + *theCurrentAttr = tAttr; + } else if ((*theFirstAttr)->next() == NULL || // only one in list + (*theFirstAttr)->attrId() > tAttrId) {// or first + tAttr->next(*theFirstAttr); + *theFirstAttr = tAttr; } else { // at least 2 in list and not first and not last - NdbRecAttr *p = theFirstRecAttr; + NdbRecAttr *p = *theFirstAttr; NdbRecAttr *p_next = p->next(); while (tAttrId > p_next->attrId()) { p = p_next; p_next = p->next(); } if (tAttrId == p_next->attrId()) { // Using same attribute twice - tRecAttr->release(); // do I need to do this? - m_ndb->releaseRecAttr(tRecAttr); + tAttr->release(); // do I need to do this? + m_ndb->releaseRecAttr(tAttr); exit(-1); - return NULL; + DBUG_RETURN(NULL); } // this is it, between p and p_next - p->next(tRecAttr); - tRecAttr->next(p_next); + p->next(tAttr); + tAttr->next(p_next); } } - - return tRecAttr; + DBUG_RETURN(tAttr); } int NdbEventOperationImpl::execute() { + DBUG_ENTER("NdbEventOperationImpl::execute"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { - ndbout_c("NdbEventOperation::execute(): getDictionary=NULL"); - return 0; + m_error.code= m_ndb->getNdbError().code; + DBUG_RETURN(-1); } - if (theFirstRecAttrs[0] == NULL) { // defaults to get all + if (theFirstPkAttrs[0] == NULL && + theFirstDataAttrs[0] == NULL) { // defaults to get all } @@ -223,13 +248,18 @@ NdbEventOperationImpl::execute() int hasSubscriber; - m_bufferId = - m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId, - hasSubscriber /* return value */); + int r= m_bufferHandle->prepareAddSubscribeEvent(this, + hasSubscriber /*return value*/); + m_error.code= 4709; + + if (r < 0) + { + DBUG_RETURN(-1); + } - m_eventImpl->m_bufferId = m_bufferId; + m_eventImpl->m_bufferId = m_bufferId = (Uint32)r; - int r = -1; + r = -1; if (m_bufferId >= 0) { // now we check if there's already a subscriber @@ -241,30 +271,33 @@ NdbEventOperationImpl::execute() if (r) { //Error m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); - m_state = NdbEventOperation::ERROR; + m_state = EO_ERROR; } else { m_bufferHandle->addSubscribeEvent(m_bufferId, this); - m_state = NdbEventOperation::EXECUTING; + m_state = EO_EXECUTING; } } else { //Error - m_state = NdbEventOperation::ERROR; + m_state = EO_ERROR; } - return r; + DBUG_RETURN(r); } int NdbEventOperationImpl::stop() { - if (m_state != NdbEventOperation::EXECUTING) - return -1; + DBUG_ENTER("NdbEventOperationImpl::stop"); + if (m_state != EO_EXECUTING) + { + DBUG_RETURN(-1); + } // ndbout_c("NdbEventOperation::stopping()"); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); if (!myDict) { - ndbout_c("NdbEventOperation::stop(): getDictionary=NULL"); - return 0; + m_error.code= m_ndb->getNdbError().code; + DBUG_RETURN(-1); } NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict); @@ -275,8 +308,8 @@ NdbEventOperationImpl::stop() hasSubscriber /* return value */); if (ret < 0) { - ndbout_c("prepareDropSubscribeEvent failed"); - return -1; + m_error.code= 4712; + DBUG_RETURN(-1); } // m_eventImpl->m_bufferId = m_bufferId; @@ -293,17 +326,17 @@ NdbEventOperationImpl::stop() if (r) { //Error m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); - m_state = NdbEventOperation::ERROR; + m_error.code= myDictImpl.m_error.code; + m_state = EO_ERROR; } else { #ifdef EVENT_DEBUG ndbout_c("NdbEventOperation::dropping()"); #endif m_bufferHandle->dropSubscribeEvent(m_bufferId); - m_state = NdbEventOperation::CREATED; + m_state = EO_CREATED; } - - return r; + DBUG_RETURN(r); } bool @@ -327,6 +360,7 @@ NdbEventOperationImpl::getLatestGCI() int NdbEventOperationImpl::next(int *pOverrun) { + DBUG_ENTER("NdbEventOperationImpl::next"); int nr = 10000; // a high value int tmpOverrun = 0; int *ptmpOverrun; @@ -343,7 +377,10 @@ NdbEventOperationImpl::next(int *pOverrun) *pOverrun = tmpOverrun; } - if (r <= 0) return r; // no data + if (r <= 0) + { + DBUG_RETURN(r); // no data + } if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever @@ -352,8 +389,13 @@ NdbEventOperationImpl::next(int *pOverrun) #endif // now move the data into the RecAttrs - if ((theFirstRecAttrs[0] == NULL) && - (theFirstRecAttrs[1] == NULL)) return r; + if ((theFirstPkAttrs[0] == NULL) && + (theFirstPkAttrs[1] == NULL) && + (theFirstDataAttrs[0] == NULL) && + (theFirstDataAttrs[1] == NULL)) + { + DBUG_RETURN(r); + } // no copying since no RecAttr's @@ -364,20 +406,37 @@ NdbEventOperationImpl::next(int *pOverrun) #ifdef EVENT_DEBUG int i; printf("after values sz=%u\n", ptr[1].sz); - for (i=0; i < ptr[1].sz; i++) + for(i=0; i < (int)ptr[1].sz; i++) printf ("H'%.8X ",ptr[1].p[i]); printf("\n"); printf("before values sz=%u\n", ptr[2].sz); - for (i=0; i < ptr[2].sz; i++) + for(i=0; i < (int)ptr[2].sz; i++) printf ("H'%.8X ",ptr[2].p[i]); printf("\n"); #endif - NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0]; - // copy data into the RecAttr's // we assume that the respective attribute lists are sorted + // first the pk's + { + NdbRecAttr *tAttr= theFirstPkAttrs[0]; + while(tAttr) + { + assert(aAttrPtr < aAttrEndPtr); + unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize(); + assert(tAttr->attrId() == + AttributeHeader(*aAttrPtr).getAttributeId()); + assert(tAttr->receive_data(aDataPtr, tDataSz)); + // next + aAttrPtr++; + aDataPtr+= tDataSz; + tAttr= tAttr->next(); + } + } + + NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0]; + Uint32 tRecAttrId; Uint32 tAttrId; Uint32 tDataSz; @@ -389,7 +448,7 @@ NdbEventOperationImpl::next(int *pOverrun) while (tAttrId > tRecAttrId) { //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -401,32 +460,25 @@ NdbEventOperationImpl::next(int *pOverrun) //printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); if (tAttrId == tRecAttrId) { - if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) - hasSomeData++; + hasSomeData++; //printf("set!\n"); - tWorkingRecAttr->receive_data(aDataPtr, tDataSz); - - // move forward, data has already moved forward - aAttrPtr++; - aDataPtr += tDataSz; + assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); - } else { - // move only attr forward - aAttrPtr++; - aDataPtr += tDataSz; } + aAttrPtr++; + aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { tRecAttrId = tWorkingRecAttr->attrId(); //printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId); - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } - tWorkingRecAttr = theFirstRecAttrs[1]; + tWorkingRecAttr = theFirstDataAttrs[1]; aDataPtr = ptr[2].p; Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz; while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) { @@ -435,7 +487,7 @@ NdbEventOperationImpl::next(int *pOverrun) tDataSz = AttributeHeader(*aDataPtr).getDataSize(); aDataPtr++; while (tAttrId > tRecAttrId) { - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); if (tWorkingRecAttr == NULL) break; @@ -444,27 +496,25 @@ NdbEventOperationImpl::next(int *pOverrun) if (tWorkingRecAttr == NULL) break; if (tAttrId == tRecAttrId) { - if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()) - hasSomeData++; + assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey()); + hasSomeData++; - tWorkingRecAttr->receive_data(aDataPtr, tDataSz); - aDataPtr += tDataSz; - // move forward, data+attr has already moved forward + assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz)); tWorkingRecAttr = tWorkingRecAttr->next(); - } else { - // move only data+attr forward - aDataPtr += tDataSz; } + aDataPtr += tDataSz; } while (tWorkingRecAttr != NULL) { - tWorkingRecAttr->setNULL(); + tWorkingRecAttr->setUNDEFINED(); tWorkingRecAttr = tWorkingRecAttr->next(); } if (hasSomeData) - return r; + { + DBUG_RETURN(r); + } } - return 0; + DBUG_RETURN(0); } NdbDictionary::Event::TableEvent @@ -487,10 +537,20 @@ NdbEventOperationImpl::getEventType() void NdbEventOperationImpl::print() { + int i; ndbout << "EventId " << m_eventId << "\n"; - for (int i = 0; i < 2; i++) { - NdbRecAttr *p = theFirstRecAttrs[i]; + for (i = 0; i < 2; i++) { + NdbRecAttr *p = theFirstPkAttrs[i]; + ndbout << " %u " << i; + while (p) { + ndbout << " : " << p->attrId() << " = " << *p; + p = p->next(); + } + ndbout << "\n"; + } + for (i = 0; i < 2; i++) { + NdbRecAttr *p = theFirstDataAttrs[i]; ndbout << " %u " << i; while (p) { ndbout << " : " << p->attrId() << " = " << *p; @@ -639,23 +699,28 @@ NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle() void NdbGlobalEventBufferHandle::addBufferId(int bufferId) { + DBUG_ENTER("NdbGlobalEventBufferHandle::addBufferId"); + DBUG_PRINT("enter",("bufferId=%d",bufferId)); if (m_nids >= NDB_MAX_ACTIVE_EVENTS) { ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting"); exit(-1); } m_bufferIds[m_nids] = bufferId; m_nids++; + DBUG_VOID_RETURN; } void NdbGlobalEventBufferHandle::dropBufferId(int bufferId) { + DBUG_ENTER("NdbGlobalEventBufferHandle::dropBufferId"); + DBUG_PRINT("enter",("bufferId=%d",bufferId)); for (int i = 0; i < m_nids; i++) if (m_bufferIds[i] == bufferId) { m_nids--; for (; i < m_nids; i++) m_bufferIds[i] = m_bufferIds[i+1]; - return; + DBUG_VOID_RETURN; } ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist", bufferId); @@ -674,10 +739,11 @@ NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle) } */ int -NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId, - int& hasSubscriber) +NdbGlobalEventBufferHandle::prepareAddSubscribeEvent +(NdbEventOperationImpl *eventOp, int& hasSubscriber) { - ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber)); + ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp, + hasSubscriber)); } void NdbGlobalEventBufferHandle::addSubscribeEvent @@ -830,57 +896,68 @@ NdbGlobalEventBuffer::~NdbGlobalEventBuffer() // NdbMem_Deallocate(m_eventBufferIdToEventId); } void -NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, +NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h, int MAX_NUMBER_ACTIVE_EVENTS) { - if (m_handlers.size() == 0) { // First init + DBUG_ENTER("NdbGlobalEventBuffer::real_init"); + DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h)); + if (m_handlers.size() == 0) + { // First init + DBUG_PRINT("info",("first to come")); m_max = MAX_NUMBER_ACTIVE_EVENTS; m_buf = new BufItem[m_max]; - // (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem)); - for (int i=0; i<m_max; i++) { - m_buf[i].gId = 0; + m_buf[i].gId= 0; } } + assert(m_max == MAX_NUMBER_ACTIVE_EVENTS); // TODO make sure we don't hit roof - // m_handlers[m_nhandlers] = h; m_handlers.push_back(h); - // ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h); + DBUG_VOID_RETURN; } void NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h) { - // ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h); - for (Uint32 i=0 ; i < m_handlers.size(); i++) { - // ndbout_c("%u %u %u", i, m_handlers[i], h); - if (m_handlers[i] == h) { + DBUG_ENTER("NdbGlobalEventBuffer::real_remove"); + DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h)); + for (Uint32 i=0 ; i < m_handlers.size(); i++) + { + DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i])); + if (m_handlers[i] == h) + { m_handlers.erase(i); - if (m_handlers.size() == 0) { - // ndbout_c("last to go"); + if (m_handlers.size() == 0) + { + DBUG_PRINT("info",("last to go")); delete[] m_buf; m_buf = NULL; - // NdbMem_Free((char*)m_buf); } - return; + DBUG_VOID_RETURN; } } - ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle"); - exit(-1); + ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle"); + DBUG_PRINT("error",("non-existing handle")); + abort(); + DBUG_VOID_RETURN; } -int +int NdbGlobalEventBuffer::real_prepareAddSubscribeEvent -(NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber) +(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp, + int& hasSubscriber) { + DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent"); int i; - int bufferId = -1; + int bufferId= -1; + Uint32 eventId= eventOp->m_eventId; + DBUG_PRINT("enter",("eventId: %u", eventId)); // add_drop_lock(); // only one thread can do add or drop at a time // Find place where eventId already set for (i=0; i<m_no; i++) { if (m_buf[i].gId == eventId) { - bufferId = i; + bufferId= i; break; } } @@ -888,53 +965,55 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent // find space for new bufferId for (i=0; i<m_no; i++) { if (m_buf[i].gId == 0) { - bufferId = i; // we found an empty spot - break; + bufferId= i; // we found an empty spot + goto found_bufferId; } } if (bufferId < 0 && m_no < m_max) { // room for more so get that - bufferId=m_no; - m_buf[m_no].gId = 0; + bufferId= m_no; + m_buf[m_no].gId= 0; m_no++; } else { - ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers"); - // add_drop_unlock(); - return -1; + // add_drop_unlock(); + DBUG_PRINT("error",("Can't accept more subscribers:" + " bufferId=%d, m_no=%d, m_max=%d", + bufferId, m_no, m_max)); + DBUG_RETURN(-1); } } +found_bufferId: - BufItem &b = m_buf[ID(bufferId)]; + BufItem &b= m_buf[ID(bufferId)]; if (b.gId == 0) { // first subscriber needs some initialization - bufferId = NO_ID(0, bufferId); + bufferId= NO_ID(0, bufferId); - b.gId = eventId; + b.gId= eventId; + b.eventType= (Uint32)eventOp->m_eventImpl->mi_type; - if ((b.p_buf_mutex = NdbMutex_Create()) == NULL) { + if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) { ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed"); - exit(-1); + abort(); } - b.subs = 0; - b.f = 0; - b.sz = 0; - b.max_sz = aHandle->m_bufferL; - b.data = + b.subs= 0; + b.f= 0; + b.sz= 0; + b.max_sz= aHandle->m_bufferL; + b.data= (BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data)); for (int i = 0; i < b.max_sz; i++) { - b.data[i].sdata = NULL; - b.data[i].ptr[0].p = NULL; - b.data[i].ptr[1].p = NULL; - b.data[i].ptr[2].p = NULL; + b.data[i].sdata= NULL; + b.data[i].ptr[0].p= NULL; + b.data[i].ptr[1].p= NULL; + b.data[i].ptr[2].p= NULL; } } else { -#ifdef EVENT_DEBUG - ndbout_c("NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u", b.subs); -#endif - + DBUG_PRINT("info", + ("TRYING handle one subscriber per event b.subs=%u",b.subs)); int ni = -1; for(int i=0; i < b.subs;i++) { if (b.ps[i].theHandle == NULL) { @@ -946,9 +1025,10 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) { ni = b.subs; } else { - ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers"); + DBUG_PRINT("error", + ("Can't accept more subscribers: b.subs=%d",b.subs)); // add_drop_unlock(); - return -1; + DBUG_RETURN(-1); } } bufferId = NO_ID(ni, bufferId); @@ -969,23 +1049,25 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent else hasSubscriber = 0; -#ifdef EVENT_DEBUG - ndbout_c("prepareAddSubscribeEvent: handed out bufferId %d for eventId %d", - bufferId, eventId); -#endif + DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d hasSubscriber=%d", + bufferId, eventId, hasSubscriber)); /* we now have a lock on the prepare so that no one can mess with this * unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent */ - return bufferId; + DBUG_RETURN(bufferId); } void NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId) { + DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); + DBUG_PRINT("enter", ("bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d", + bufferId, ID(bufferId), NO(bufferId))); + b.ps[n].theHandle = NULL; // remove subscribers from the end, @@ -998,10 +1080,8 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId) break; if (b.subs == 0) { -#ifdef EVENT_DEBUG - ndbout_c("unprepareAddSubscribeEvent: no more subscribers left on eventId %d", b.gId); -#endif - b.gId = 0; // We don't have any subscribers, reuse BufItem + DBUG_PRINT("info",("no more subscribers left on eventId %d", b.gId)); + b.gId= 0; // We don't have any subscribers, reuse BufItem if (b.data) { NdbMem_Free((void *)b.data); b.data = NULL; @@ -1012,12 +1092,14 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId) } } // add_drop_unlock(); + DBUG_VOID_RETURN; } void NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, void *ndbEventOperation) { + DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); @@ -1025,9 +1107,8 @@ NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId, b.ps[n].theHandle->addBufferId(bufferId); // add_drop_unlock(); -#ifdef EVENT_DEBUG - ndbout_c("addSubscribeEvent:: added bufferId %d", bufferId); -#endif + DBUG_PRINT("info",("added bufferId %d", bufferId)); + DBUG_VOID_RETURN; } void @@ -1040,6 +1121,7 @@ int NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId, int& hasSubscriber) { + DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent"); // add_drop_lock(); // only one thread can do add or drop at a time BufItem &b = m_buf[ID(bufferId)]; @@ -1055,14 +1137,17 @@ NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId, else if (n == 1) hasSubscriber = 0; else - return -1; + { + DBUG_RETURN(-1); + } - return 0; + DBUG_RETURN(0); } void NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId) { + DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent"); // add_drop_lock(); // only one thread can do add-drop at a time BufItem &b = m_buf[ID(bufferId)]; @@ -1078,6 +1163,7 @@ NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId) #ifdef EVENT_DEBUG ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId); #endif + DBUG_VOID_RETURN; } void @@ -1100,10 +1186,13 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, const SubTableData * const sdata, LinearSectionPtr ptr[3]) { + DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL"); BufItem &b = m_buf[ID(bufferId)]; #ifdef EVENT_DEBUG int n = NO(bufferId); #endif + + if ( b.eventType & (1 << (Uint32)sdata->operation) ) { if (b.subs) { #ifdef EVENT_DEBUG @@ -1112,7 +1201,9 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, // move front forward if (copy_data_alloc(sdata, ptr, b.data[b.f].sdata, b.data[b.f].ptr)) - return -1; + { + DBUG_RETURN(-1); + } for (int i=0; i < b.subs; i++) { NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i]; if (e.theHandle) { // active subscriber @@ -1120,7 +1211,7 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, if (e.bufferempty == 0) { e.overrun++; // another item has been overwritten e.b++; // move next-to-read next since old item was overwritten - if (e.b == b.max_sz) e.b = 0; // start from beginning + if (e.b == b.max_sz) e.b= 0; // start from beginning } } e.bufferempty = 0; @@ -1140,21 +1231,35 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId, #endif } } - return 0; + else + { +#ifdef EVENT_DEBUG + ndbout_c("skipped"); +#endif + } + + DBUG_RETURN(0); } int NdbGlobalEventBuffer::hasData(int bufferId) { + DBUG_ENTER("NdbGlobalEventBuffer::hasData"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; if(e.bufferempty) - return 0; + { + DBUG_RETURN(0); + } if (b.f <= e.b) - return b.max_sz-e.b + b.f; + { + DBUG_RETURN(b.max_sz-e.b + b.f); + } else - return b.f-e.b; + { + DBUG_RETURN(b.f-e.b); + } } int NdbGlobalEventBuffer::real_getDataL(const int bufferId, @@ -1162,6 +1267,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId, LinearSectionPtr ptr[3], int *pOverrun) { + DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL"); BufItem &b = m_buf[ID(bufferId)]; int n = NO(bufferId); NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n]; @@ -1172,13 +1278,20 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId, } if (e.bufferempty) - return 0; // nothing to get + { + DBUG_RETURN(0); // nothing to get + } + + DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d", + ID(bufferId), NO(bufferId), e.b)); if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr, sdata, ptr)) - return -1; + { + DBUG_RETURN(-1); + } - e.b++; if (e.b == b.max_sz) e.b = 0; // move next-to-read forward + e.b++; if (e.b == b.max_sz) e.b= 0; // move next-to-read forward if (b.f == e.b) // back has cought up with front e.bufferempty = 1; @@ -1187,7 +1300,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId, ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId); #endif - return hasData(bufferId)+1; + DBUG_RETURN(hasData(bufferId)+1); } int NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, @@ -1195,49 +1308,59 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata, SubTableData * &t_sdata, LinearSectionPtr t_ptr[3]) { - if (t_sdata == NULL) { - t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData)); - } + DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc"); + unsigned sz4= (sizeof(SubTableData)+3)>>2; + Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 + + f_ptr[0].sz + + f_ptr[1].sz + + f_ptr[2].sz) * sizeof(Uint32)); + if (t_sdata) + NdbMem_Free((char*)t_sdata); + t_sdata= (SubTableData *)ptr; memcpy(t_sdata,f_sdata,sizeof(SubTableData)); + ptr+= sz4; + for (int i = 0; i < 3; i++) { LinearSectionPtr & f_p = f_ptr[i]; LinearSectionPtr & t_p = t_ptr[i]; if (f_p.sz > 0) { - if (t_p.p == NULL) { - t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz); - } else if (t_p.sz != f_p.sz) { - NdbMem_Free(t_p.p); - t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz); - } + t_p.p= (Uint32 *)ptr; memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz); - } else if (t_p.p != NULL) { - NdbMem_Free(t_p.p); - t_p.p = NULL; + ptr+= f_p.sz; + t_p.sz= f_p.sz; + } else { + t_p.p= NULL; + t_p.sz= 0; } - t_p.sz = f_p.sz; } - return 0; + DBUG_RETURN(0); } int NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h, int aMillisecondNumber) { + DBUG_ENTER("NdbGlobalEventBuffer::real_wait"); // check if there are anything in any of the buffers int i; int n = 0; for (i = 0; i < h->m_nids; i++) n += hasData(h->m_bufferIds[i]); - if (n) return n; + if (n) + { + DBUG_RETURN(n); + } int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex, aMillisecondNumber); if (r > 0) - return -1; + { + DBUG_RETURN(-1); + } n = 0; for (i = 0; i < h->m_nids; i++) n += hasData(h->m_bufferIds[i]); - return n; + DBUG_RETURN(n); } template class Vector<NdbGlobalEventBufferHandle*>; |