summaryrefslogtreecommitdiff
path: root/ndb/src/ndbapi/NdbEventOperationImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/ndbapi/NdbEventOperationImpl.cpp')
-rw-r--r--ndb/src/ndbapi/NdbEventOperationImpl.cpp523
1 files changed, 323 insertions, 200 deletions
diff --git a/ndb/src/ndbapi/NdbEventOperationImpl.cpp b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
index 87bbca5fc71..208525bfc15 100644
--- a/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+++ b/ndb/src/ndbapi/NdbEventOperationImpl.cpp
@@ -55,14 +55,17 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
const char* eventName,
const int bufferLength)
: NdbEventOperation(*this), m_ndb(theNdb),
- m_state(ERROR), m_bufferL(bufferLength)
+ m_state(EO_ERROR), m_bufferL(bufferLength)
{
-
m_eventId = 0;
- theFirstRecAttrs[0] = NULL;
- theCurrentRecAttrs[0] = NULL;
- theFirstRecAttrs[1] = NULL;
- theCurrentRecAttrs[1] = NULL;
+ theFirstPkAttrs[0] = NULL;
+ theCurrentPkAttrs[0] = NULL;
+ theFirstPkAttrs[1] = NULL;
+ theCurrentPkAttrs[1] = NULL;
+ theFirstDataAttrs[0] = NULL;
+ theCurrentDataAttrs[0] = NULL;
+ theFirstDataAttrs[1] = NULL;
+ theCurrentDataAttrs[1] = NULL;
sdata = NULL;
ptr[0].p = NULL;
ptr[1].p = NULL;
@@ -71,16 +74,17 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
// we should lookup id in Dictionary, TODO
// also make sure we only have one listener on each event
- if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; }
+ if (!m_ndb) abort();
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
- if (!myDict) { ndbout_c("getDictionary=NULL"); return; }
+ if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; }
const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
- if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; }
+ if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; }
m_eventImpl = &myEvnt->m_impl;
- if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; }
+
+ m_eventId = m_eventImpl->m_eventId;
m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
if (m_bufferHandle->m_bufferL > 0)
@@ -88,25 +92,30 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
else
m_bufferHandle->m_bufferL = m_bufferL;
- m_state = CREATED;
+ m_state = EO_CREATED;
}
NdbEventOperationImpl::~NdbEventOperationImpl()
{
int i;
- if (sdata) NdbMem_Free(sdata);
- for (i=0 ; i<3; i++) {
- if (ptr[i].p) NdbMem_Free(ptr[i].p);
+ if (sdata) NdbMem_Free((char*)sdata);
+ for (i=0 ; i<2; i++) {
+ NdbRecAttr *p = theFirstPkAttrs[i];
+ while (p) {
+ NdbRecAttr *p_next = p->next();
+ m_ndb->releaseRecAttr(p);
+ p = p_next;
+ }
}
for (i=0 ; i<2; i++) {
- NdbRecAttr *p = theFirstRecAttrs[i];
+ NdbRecAttr *p = theFirstDataAttrs[i];
while (p) {
NdbRecAttr *p_next = p->next();
m_ndb->releaseRecAttr(p);
p = p_next;
}
}
- if (m_state == NdbEventOperation::EXECUTING) {
+ if (m_state == EO_EXECUTING) {
stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // We should send stop signal here
@@ -122,36 +131,50 @@ NdbEventOperationImpl::getState()
NdbRecAttr*
NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
{
- if (m_state != NdbEventOperation::CREATED) {
+ DBUG_ENTER("NdbEventOperationImpl::getValue");
+ if (m_state != EO_CREATED) {
ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");
- return NULL;
+ DBUG_RETURN(NULL);
}
NdbColumnImpl *tAttrInfo = m_eventImpl->m_tableImpl->getColumn(colName);
if (tAttrInfo == NULL) {
ndbout_c("NdbEventOperationImpl::getValue attribute %s not found",colName);
- return NULL;
+ DBUG_RETURN(NULL);
}
- return NdbEventOperationImpl::getValue(tAttrInfo, aValue, n);
+ DBUG_RETURN(NdbEventOperationImpl::getValue(tAttrInfo, aValue, n));
}
NdbRecAttr*
NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, int n)
{
+ DBUG_ENTER("NdbEventOperationImpl::getValue");
// Insert Attribute Id into ATTRINFO part.
- NdbRecAttr *&theFirstRecAttr = theFirstRecAttrs[n];
- NdbRecAttr *&theCurrentRecAttr = theCurrentRecAttrs[n];
-
+
+ NdbRecAttr **theFirstAttr;
+ NdbRecAttr **theCurrentAttr;
+
+ if (tAttrInfo->getPrimaryKey())
+ {
+ theFirstAttr = &theFirstPkAttrs[n];
+ theCurrentAttr = &theCurrentPkAttrs[n];
+ }
+ else
+ {
+ theFirstAttr = &theFirstDataAttrs[n];
+ theCurrentAttr = &theCurrentDataAttrs[n];
+ }
+
/************************************************************************
* Get a Receive Attribute object and link it into the operation object.
************************************************************************/
- NdbRecAttr *tRecAttr = m_ndb->getRecAttr();
- if (tRecAttr == NULL) {
+ NdbRecAttr *tAttr = m_ndb->getRecAttr();
+ if (tAttr == NULL) {
exit(-1);
//setErrorCodeAbort(4000);
- return NULL;
+ DBUG_RETURN(NULL);
}
/**********************************************************************
@@ -159,63 +182,65 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
* the RecAttr object
* Also set attribute size, array size and attribute type
********************************************************************/
- if (tRecAttr->setup(tAttrInfo, aValue)) {
+ if (tAttr->setup(tAttrInfo, aValue)) {
//setErrorCodeAbort(4000);
- m_ndb->releaseRecAttr(tRecAttr);
+ m_ndb->releaseRecAttr(tAttr);
exit(-1);
- return NULL;
+ DBUG_RETURN(NULL);
}
//theErrorLine++;
- tRecAttr->setNULL();
+ tAttr->setUNDEFINED();
// We want to keep the list sorted to make data insertion easier later
- if (theFirstRecAttr == NULL) {
- theFirstRecAttr = tRecAttr;
- theCurrentRecAttr = tRecAttr;
- tRecAttr->next(NULL);
+
+ if (*theFirstAttr == NULL) {
+ *theFirstAttr = tAttr;
+ *theCurrentAttr = tAttr;
+ tAttr->next(NULL);
} else {
Uint32 tAttrId = tAttrInfo->m_attrId;
- if (tAttrId > theCurrentRecAttr->attrId()) { // right order
- theCurrentRecAttr->next(tRecAttr);
- tRecAttr->next(NULL);
- theCurrentRecAttr = tRecAttr;
- } else if (theFirstRecAttr->next() == NULL || // only one in list
- theFirstRecAttr->attrId() > tAttrId) {// or first
- tRecAttr->next(theFirstRecAttr);
- theFirstRecAttr = tRecAttr;
+ if (tAttrId > (*theCurrentAttr)->attrId()) { // right order
+ (*theCurrentAttr)->next(tAttr);
+ tAttr->next(NULL);
+ *theCurrentAttr = tAttr;
+ } else if ((*theFirstAttr)->next() == NULL || // only one in list
+ (*theFirstAttr)->attrId() > tAttrId) {// or first
+ tAttr->next(*theFirstAttr);
+ *theFirstAttr = tAttr;
} else { // at least 2 in list and not first and not last
- NdbRecAttr *p = theFirstRecAttr;
+ NdbRecAttr *p = *theFirstAttr;
NdbRecAttr *p_next = p->next();
while (tAttrId > p_next->attrId()) {
p = p_next;
p_next = p->next();
}
if (tAttrId == p_next->attrId()) { // Using same attribute twice
- tRecAttr->release(); // do I need to do this?
- m_ndb->releaseRecAttr(tRecAttr);
+ tAttr->release(); // do I need to do this?
+ m_ndb->releaseRecAttr(tAttr);
exit(-1);
- return NULL;
+ DBUG_RETURN(NULL);
}
// this is it, between p and p_next
- p->next(tRecAttr);
- tRecAttr->next(p_next);
+ p->next(tAttr);
+ tAttr->next(p_next);
}
}
-
- return tRecAttr;
+ DBUG_RETURN(tAttr);
}
int
NdbEventOperationImpl::execute()
{
+ DBUG_ENTER("NdbEventOperationImpl::execute");
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) {
- ndbout_c("NdbEventOperation::execute(): getDictionary=NULL");
- return 0;
+ m_error.code= m_ndb->getNdbError().code;
+ DBUG_RETURN(-1);
}
- if (theFirstRecAttrs[0] == NULL) { // defaults to get all
+ if (theFirstPkAttrs[0] == NULL &&
+ theFirstDataAttrs[0] == NULL) { // defaults to get all
}
@@ -223,13 +248,18 @@ NdbEventOperationImpl::execute()
int hasSubscriber;
- m_bufferId =
- m_bufferHandle->prepareAddSubscribeEvent(m_eventImpl->m_eventId,
- hasSubscriber /* return value */);
+ int r= m_bufferHandle->prepareAddSubscribeEvent(this,
+ hasSubscriber /*return value*/);
+ m_error.code= 4709;
+
+ if (r < 0)
+ {
+ DBUG_RETURN(-1);
+ }
- m_eventImpl->m_bufferId = m_bufferId;
+ m_eventImpl->m_bufferId = m_bufferId = (Uint32)r;
- int r = -1;
+ r = -1;
if (m_bufferId >= 0) {
// now we check if there's already a subscriber
@@ -241,30 +271,33 @@ NdbEventOperationImpl::execute()
if (r) {
//Error
m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
- m_state = NdbEventOperation::ERROR;
+ m_state = EO_ERROR;
} else {
m_bufferHandle->addSubscribeEvent(m_bufferId, this);
- m_state = NdbEventOperation::EXECUTING;
+ m_state = EO_EXECUTING;
}
} else {
//Error
- m_state = NdbEventOperation::ERROR;
+ m_state = EO_ERROR;
}
- return r;
+ DBUG_RETURN(r);
}
int
NdbEventOperationImpl::stop()
{
- if (m_state != NdbEventOperation::EXECUTING)
- return -1;
+ DBUG_ENTER("NdbEventOperationImpl::stop");
+ if (m_state != EO_EXECUTING)
+ {
+ DBUG_RETURN(-1);
+ }
// ndbout_c("NdbEventOperation::stopping()");
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) {
- ndbout_c("NdbEventOperation::stop(): getDictionary=NULL");
- return 0;
+ m_error.code= m_ndb->getNdbError().code;
+ DBUG_RETURN(-1);
}
NdbDictionaryImpl & myDictImpl = NdbDictionaryImpl::getImpl(*myDict);
@@ -275,8 +308,8 @@ NdbEventOperationImpl::stop()
hasSubscriber /* return value */);
if (ret < 0) {
- ndbout_c("prepareDropSubscribeEvent failed");
- return -1;
+ m_error.code= 4712;
+ DBUG_RETURN(-1);
}
// m_eventImpl->m_bufferId = m_bufferId;
@@ -293,17 +326,17 @@ NdbEventOperationImpl::stop()
if (r) {
//Error
m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
- m_state = NdbEventOperation::ERROR;
+ m_error.code= myDictImpl.m_error.code;
+ m_state = EO_ERROR;
} else {
#ifdef EVENT_DEBUG
ndbout_c("NdbEventOperation::dropping()");
#endif
m_bufferHandle->dropSubscribeEvent(m_bufferId);
- m_state = NdbEventOperation::CREATED;
+ m_state = EO_CREATED;
}
-
- return r;
+ DBUG_RETURN(r);
}
bool
@@ -327,6 +360,7 @@ NdbEventOperationImpl::getLatestGCI()
int
NdbEventOperationImpl::next(int *pOverrun)
{
+ DBUG_ENTER("NdbEventOperationImpl::next");
int nr = 10000; // a high value
int tmpOverrun = 0;
int *ptmpOverrun;
@@ -343,7 +377,10 @@ NdbEventOperationImpl::next(int *pOverrun)
*pOverrun = tmpOverrun;
}
- if (r <= 0) return r; // no data
+ if (r <= 0)
+ {
+ DBUG_RETURN(r); // no data
+ }
if (r < nr) r = nr; else nr--; // we don't want to be stuck here forever
@@ -352,8 +389,13 @@ NdbEventOperationImpl::next(int *pOverrun)
#endif
// now move the data into the RecAttrs
- if ((theFirstRecAttrs[0] == NULL) &&
- (theFirstRecAttrs[1] == NULL)) return r;
+ if ((theFirstPkAttrs[0] == NULL) &&
+ (theFirstPkAttrs[1] == NULL) &&
+ (theFirstDataAttrs[0] == NULL) &&
+ (theFirstDataAttrs[1] == NULL))
+ {
+ DBUG_RETURN(r);
+ }
// no copying since no RecAttr's
@@ -364,20 +406,37 @@ NdbEventOperationImpl::next(int *pOverrun)
#ifdef EVENT_DEBUG
int i;
printf("after values sz=%u\n", ptr[1].sz);
- for (i=0; i < ptr[1].sz; i++)
+ for(i=0; i < (int)ptr[1].sz; i++)
printf ("H'%.8X ",ptr[1].p[i]);
printf("\n");
printf("before values sz=%u\n", ptr[2].sz);
- for (i=0; i < ptr[2].sz; i++)
+ for(i=0; i < (int)ptr[2].sz; i++)
printf ("H'%.8X ",ptr[2].p[i]);
printf("\n");
#endif
- NdbRecAttr *tWorkingRecAttr = theFirstRecAttrs[0];
-
// copy data into the RecAttr's
// we assume that the respective attribute lists are sorted
+ // first the pk's
+ {
+ NdbRecAttr *tAttr= theFirstPkAttrs[0];
+ while(tAttr)
+ {
+ assert(aAttrPtr < aAttrEndPtr);
+ unsigned tDataSz= AttributeHeader(*aAttrPtr).getDataSize();
+ assert(tAttr->attrId() ==
+ AttributeHeader(*aAttrPtr).getAttributeId());
+ assert(tAttr->receive_data(aDataPtr, tDataSz));
+ // next
+ aAttrPtr++;
+ aDataPtr+= tDataSz;
+ tAttr= tAttr->next();
+ }
+ }
+
+ NdbRecAttr *tWorkingRecAttr = theFirstDataAttrs[0];
+
Uint32 tRecAttrId;
Uint32 tAttrId;
Uint32 tDataSz;
@@ -389,7 +448,7 @@ NdbEventOperationImpl::next(int *pOverrun)
while (tAttrId > tRecAttrId) {
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
- tWorkingRecAttr->setNULL();
+ tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
@@ -401,32 +460,25 @@ NdbEventOperationImpl::next(int *pOverrun)
//printf("[%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
if (tAttrId == tRecAttrId) {
- if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
- hasSomeData++;
+ hasSomeData++;
//printf("set!\n");
- tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
-
- // move forward, data has already moved forward
- aAttrPtr++;
- aDataPtr += tDataSz;
+ assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
tWorkingRecAttr = tWorkingRecAttr->next();
- } else {
- // move only attr forward
- aAttrPtr++;
- aDataPtr += tDataSz;
}
+ aAttrPtr++;
+ aDataPtr += tDataSz;
}
while (tWorkingRecAttr != NULL) {
tRecAttrId = tWorkingRecAttr->attrId();
//printf("set undefined [%u] %u %u [%u]\n", tAttrId, tDataSz, *aDataPtr, tRecAttrId);
- tWorkingRecAttr->setNULL();
+ tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
}
- tWorkingRecAttr = theFirstRecAttrs[1];
+ tWorkingRecAttr = theFirstDataAttrs[1];
aDataPtr = ptr[2].p;
Uint32 *aDataEndPtr = aDataPtr + ptr[2].sz;
while ((aDataPtr < aDataEndPtr) && (tWorkingRecAttr != NULL)) {
@@ -435,7 +487,7 @@ NdbEventOperationImpl::next(int *pOverrun)
tDataSz = AttributeHeader(*aDataPtr).getDataSize();
aDataPtr++;
while (tAttrId > tRecAttrId) {
- tWorkingRecAttr->setNULL();
+ tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
if (tWorkingRecAttr == NULL)
break;
@@ -444,27 +496,25 @@ NdbEventOperationImpl::next(int *pOverrun)
if (tWorkingRecAttr == NULL)
break;
if (tAttrId == tRecAttrId) {
- if (!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey())
- hasSomeData++;
+ assert(!m_eventImpl->m_tableImpl->getColumn(tRecAttrId)->getPrimaryKey());
+ hasSomeData++;
- tWorkingRecAttr->receive_data(aDataPtr, tDataSz);
- aDataPtr += tDataSz;
- // move forward, data+attr has already moved forward
+ assert(tWorkingRecAttr->receive_data(aDataPtr, tDataSz));
tWorkingRecAttr = tWorkingRecAttr->next();
- } else {
- // move only data+attr forward
- aDataPtr += tDataSz;
}
+ aDataPtr += tDataSz;
}
while (tWorkingRecAttr != NULL) {
- tWorkingRecAttr->setNULL();
+ tWorkingRecAttr->setUNDEFINED();
tWorkingRecAttr = tWorkingRecAttr->next();
}
if (hasSomeData)
- return r;
+ {
+ DBUG_RETURN(r);
+ }
}
- return 0;
+ DBUG_RETURN(0);
}
NdbDictionary::Event::TableEvent
@@ -487,10 +537,20 @@ NdbEventOperationImpl::getEventType()
void
NdbEventOperationImpl::print()
{
+ int i;
ndbout << "EventId " << m_eventId << "\n";
- for (int i = 0; i < 2; i++) {
- NdbRecAttr *p = theFirstRecAttrs[i];
+ for (i = 0; i < 2; i++) {
+ NdbRecAttr *p = theFirstPkAttrs[i];
+ ndbout << " %u " << i;
+ while (p) {
+ ndbout << " : " << p->attrId() << " = " << *p;
+ p = p->next();
+ }
+ ndbout << "\n";
+ }
+ for (i = 0; i < 2; i++) {
+ NdbRecAttr *p = theFirstDataAttrs[i];
ndbout << " %u " << i;
while (p) {
ndbout << " : " << p->attrId() << " = " << *p;
@@ -639,23 +699,28 @@ NdbGlobalEventBufferHandle::~NdbGlobalEventBufferHandle()
void
NdbGlobalEventBufferHandle::addBufferId(int bufferId)
{
+ DBUG_ENTER("NdbGlobalEventBufferHandle::addBufferId");
+ DBUG_PRINT("enter",("bufferId=%d",bufferId));
if (m_nids >= NDB_MAX_ACTIVE_EVENTS) {
ndbout_c("NdbGlobalEventBufferHandle::addBufferId error in paramerer setting");
exit(-1);
}
m_bufferIds[m_nids] = bufferId;
m_nids++;
+ DBUG_VOID_RETURN;
}
void
NdbGlobalEventBufferHandle::dropBufferId(int bufferId)
{
+ DBUG_ENTER("NdbGlobalEventBufferHandle::dropBufferId");
+ DBUG_PRINT("enter",("bufferId=%d",bufferId));
for (int i = 0; i < m_nids; i++)
if (m_bufferIds[i] == bufferId) {
m_nids--;
for (; i < m_nids; i++)
m_bufferIds[i] = m_bufferIds[i+1];
- return;
+ DBUG_VOID_RETURN;
}
ndbout_c("NdbGlobalEventBufferHandle::dropBufferId %d does not exist",
bufferId);
@@ -674,10 +739,11 @@ NdbGlobalEventBufferHandle::drop(NdbGlobalEventBufferHandle *handle)
}
*/
int
-NdbGlobalEventBufferHandle::prepareAddSubscribeEvent(Uint32 eventId,
- int& hasSubscriber)
+NdbGlobalEventBufferHandle::prepareAddSubscribeEvent
+(NdbEventOperationImpl *eventOp, int& hasSubscriber)
{
- ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventId, hasSubscriber));
+ ADD_DROP_LOCK_GUARDR(int,real_prepareAddSubscribeEvent(this, eventOp,
+ hasSubscriber));
}
void
NdbGlobalEventBufferHandle::addSubscribeEvent
@@ -830,57 +896,68 @@ NdbGlobalEventBuffer::~NdbGlobalEventBuffer()
// NdbMem_Deallocate(m_eventBufferIdToEventId);
}
void
-NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
+NdbGlobalEventBuffer::real_init (NdbGlobalEventBufferHandle *h,
int MAX_NUMBER_ACTIVE_EVENTS)
{
- if (m_handlers.size() == 0) { // First init
+ DBUG_ENTER("NdbGlobalEventBuffer::real_init");
+ DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
+ if (m_handlers.size() == 0)
+ { // First init
+ DBUG_PRINT("info",("first to come"));
m_max = MAX_NUMBER_ACTIVE_EVENTS;
m_buf = new BufItem[m_max];
- // (BufItem *)NdbMem_Allocate(m_max*sizeof(BufItem));
-
for (int i=0; i<m_max; i++) {
- m_buf[i].gId = 0;
+ m_buf[i].gId= 0;
}
}
+ assert(m_max == MAX_NUMBER_ACTIVE_EVENTS);
// TODO make sure we don't hit roof
- // m_handlers[m_nhandlers] = h;
m_handlers.push_back(h);
- // ndbout_c("NdbGlobalEventBuffer::real_init(), m_handles=%u %u", m_nhandlers, h);
+ DBUG_VOID_RETURN;
}
void
NdbGlobalEventBuffer::real_remove(NdbGlobalEventBufferHandle *h)
{
- // ndbout_c("NdbGlobalEventBuffer::real_init_remove(), m_handles=%u %u", m_nhandlers, h);
- for (Uint32 i=0 ; i < m_handlers.size(); i++) {
- // ndbout_c("%u %u %u", i, m_handlers[i], h);
- if (m_handlers[i] == h) {
+ DBUG_ENTER("NdbGlobalEventBuffer::real_remove");
+ DBUG_PRINT("enter",("m_handles.size()=%u %u", m_handlers.size(), h));
+ for (Uint32 i=0 ; i < m_handlers.size(); i++)
+ {
+ DBUG_PRINT("info",("m_handlers[%u] %u", i, m_handlers[i]));
+ if (m_handlers[i] == h)
+ {
m_handlers.erase(i);
- if (m_handlers.size() == 0) {
- // ndbout_c("last to go");
+ if (m_handlers.size() == 0)
+ {
+ DBUG_PRINT("info",("last to go"));
delete[] m_buf;
m_buf = NULL;
- // NdbMem_Free((char*)m_buf);
}
- return;
+ DBUG_VOID_RETURN;
}
}
- ndbout_c("NdbGlobalEventBuffer::real_init_remove() non-existing handle");
- exit(-1);
+ ndbout_c("NdbGlobalEventBuffer::real_remove() non-existing handle");
+ DBUG_PRINT("error",("non-existing handle"));
+ abort();
+ DBUG_VOID_RETURN;
}
-int
+int
NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
-(NdbGlobalEventBufferHandle *aHandle, Uint32 eventId, int& hasSubscriber)
+(NdbGlobalEventBufferHandle *aHandle, NdbEventOperationImpl *eventOp,
+ int& hasSubscriber)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_prepareAddSubscribeEvent");
int i;
- int bufferId = -1;
+ int bufferId= -1;
+ Uint32 eventId= eventOp->m_eventId;
+ DBUG_PRINT("enter",("eventId: %u", eventId));
// add_drop_lock(); // only one thread can do add or drop at a time
// Find place where eventId already set
for (i=0; i<m_no; i++) {
if (m_buf[i].gId == eventId) {
- bufferId = i;
+ bufferId= i;
break;
}
}
@@ -888,53 +965,55 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
// find space for new bufferId
for (i=0; i<m_no; i++) {
if (m_buf[i].gId == 0) {
- bufferId = i; // we found an empty spot
- break;
+ bufferId= i; // we found an empty spot
+ goto found_bufferId;
}
}
if (bufferId < 0 &&
m_no < m_max) {
// room for more so get that
- bufferId=m_no;
- m_buf[m_no].gId = 0;
+ bufferId= m_no;
+ m_buf[m_no].gId= 0;
m_no++;
} else {
- ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
- // add_drop_unlock();
- return -1;
+ // add_drop_unlock();
+ DBUG_PRINT("error",("Can't accept more subscribers:"
+ " bufferId=%d, m_no=%d, m_max=%d",
+ bufferId, m_no, m_max));
+ DBUG_RETURN(-1);
}
}
+found_bufferId:
- BufItem &b = m_buf[ID(bufferId)];
+ BufItem &b= m_buf[ID(bufferId)];
if (b.gId == 0) { // first subscriber needs some initialization
- bufferId = NO_ID(0, bufferId);
+ bufferId= NO_ID(0, bufferId);
- b.gId = eventId;
+ b.gId= eventId;
+ b.eventType= (Uint32)eventOp->m_eventImpl->mi_type;
- if ((b.p_buf_mutex = NdbMutex_Create()) == NULL) {
+ if ((b.p_buf_mutex= NdbMutex_Create()) == NULL) {
ndbout_c("NdbGlobalEventBuffer: NdbMutex_Create() failed");
- exit(-1);
+ abort();
}
- b.subs = 0;
- b.f = 0;
- b.sz = 0;
- b.max_sz = aHandle->m_bufferL;
- b.data =
+ b.subs= 0;
+ b.f= 0;
+ b.sz= 0;
+ b.max_sz= aHandle->m_bufferL;
+ b.data=
(BufItem::Data *)NdbMem_Allocate(b.max_sz*sizeof(BufItem::Data));
for (int i = 0; i < b.max_sz; i++) {
- b.data[i].sdata = NULL;
- b.data[i].ptr[0].p = NULL;
- b.data[i].ptr[1].p = NULL;
- b.data[i].ptr[2].p = NULL;
+ b.data[i].sdata= NULL;
+ b.data[i].ptr[0].p= NULL;
+ b.data[i].ptr[1].p= NULL;
+ b.data[i].ptr[2].p= NULL;
}
} else {
-#ifdef EVENT_DEBUG
- ndbout_c("NdbGlobalEventBuffer::prepareAddSubscribeEvent: TRYING handle one subscriber per event b.subs = %u", b.subs);
-#endif
-
+ DBUG_PRINT("info",
+ ("TRYING handle one subscriber per event b.subs=%u",b.subs));
int ni = -1;
for(int i=0; i < b.subs;i++) {
if (b.ps[i].theHandle == NULL) {
@@ -946,9 +1025,10 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
if (b.subs < MAX_SUBSCRIBERS_PER_EVENT) {
ni = b.subs;
} else {
- ndbout_c("prepareAddSubscribeEvent: Can't accept more subscribers");
+ DBUG_PRINT("error",
+ ("Can't accept more subscribers: b.subs=%d",b.subs));
// add_drop_unlock();
- return -1;
+ DBUG_RETURN(-1);
}
}
bufferId = NO_ID(ni, bufferId);
@@ -969,23 +1049,25 @@ NdbGlobalEventBuffer::real_prepareAddSubscribeEvent
else
hasSubscriber = 0;
-#ifdef EVENT_DEBUG
- ndbout_c("prepareAddSubscribeEvent: handed out bufferId %d for eventId %d",
- bufferId, eventId);
-#endif
+ DBUG_PRINT("info",("handed out bufferId=%d for eventId=%d hasSubscriber=%d",
+ bufferId, eventId, hasSubscriber));
/* we now have a lock on the prepare so that no one can mess with this
* unlock comes in unprepareAddSubscribeEvent or addSubscribeEvent
*/
- return bufferId;
+ DBUG_RETURN(bufferId);
}
void
NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent");
BufItem &b = m_buf[ID(bufferId)];
int n = NO(bufferId);
+ DBUG_PRINT("enter", ("bufferId=%d,ID(bufferId)=%d,NO(bufferId)=%d",
+ bufferId, ID(bufferId), NO(bufferId)));
+
b.ps[n].theHandle = NULL;
// remove subscribers from the end,
@@ -998,10 +1080,8 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
break;
if (b.subs == 0) {
-#ifdef EVENT_DEBUG
- ndbout_c("unprepareAddSubscribeEvent: no more subscribers left on eventId %d", b.gId);
-#endif
- b.gId = 0; // We don't have any subscribers, reuse BufItem
+ DBUG_PRINT("info",("no more subscribers left on eventId %d", b.gId));
+ b.gId= 0; // We don't have any subscribers, reuse BufItem
if (b.data) {
NdbMem_Free((void *)b.data);
b.data = NULL;
@@ -1012,12 +1092,14 @@ NdbGlobalEventBuffer::real_unprepareAddSubscribeEvent(int bufferId)
}
}
// add_drop_unlock();
+ DBUG_VOID_RETURN;
}
void
NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId,
void *ndbEventOperation)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_addSubscribeEvent");
BufItem &b = m_buf[ID(bufferId)];
int n = NO(bufferId);
@@ -1025,9 +1107,8 @@ NdbGlobalEventBuffer::real_addSubscribeEvent(int bufferId,
b.ps[n].theHandle->addBufferId(bufferId);
// add_drop_unlock();
-#ifdef EVENT_DEBUG
- ndbout_c("addSubscribeEvent:: added bufferId %d", bufferId);
-#endif
+ DBUG_PRINT("info",("added bufferId %d", bufferId));
+ DBUG_VOID_RETURN;
}
void
@@ -1040,6 +1121,7 @@ int
NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
int& hasSubscriber)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_prepareDropSubscribeEvent");
// add_drop_lock(); // only one thread can do add or drop at a time
BufItem &b = m_buf[ID(bufferId)];
@@ -1055,14 +1137,17 @@ NdbGlobalEventBuffer::real_prepareDropSubscribeEvent(int bufferId,
else if (n == 1)
hasSubscriber = 0;
else
- return -1;
+ {
+ DBUG_RETURN(-1);
+ }
- return 0;
+ DBUG_RETURN(0);
}
void
NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_dropSubscribeEvent");
// add_drop_lock(); // only one thread can do add-drop at a time
BufItem &b = m_buf[ID(bufferId)];
@@ -1078,6 +1163,7 @@ NdbGlobalEventBuffer::real_dropSubscribeEvent(int bufferId)
#ifdef EVENT_DEBUG
ndbout_c("dropSubscribeEvent:: dropped bufferId %d", bufferId);
#endif
+ DBUG_VOID_RETURN;
}
void
@@ -1100,10 +1186,13 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
const SubTableData * const sdata,
LinearSectionPtr ptr[3])
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_insertDataL");
BufItem &b = m_buf[ID(bufferId)];
#ifdef EVENT_DEBUG
int n = NO(bufferId);
#endif
+
+ if ( b.eventType & (1 << (Uint32)sdata->operation) )
{
if (b.subs) {
#ifdef EVENT_DEBUG
@@ -1112,7 +1201,9 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
// move front forward
if (copy_data_alloc(sdata, ptr,
b.data[b.f].sdata, b.data[b.f].ptr))
- return -1;
+ {
+ DBUG_RETURN(-1);
+ }
for (int i=0; i < b.subs; i++) {
NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[i];
if (e.theHandle) { // active subscriber
@@ -1120,7 +1211,7 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
if (e.bufferempty == 0) {
e.overrun++; // another item has been overwritten
e.b++; // move next-to-read next since old item was overwritten
- if (e.b == b.max_sz) e.b = 0; // start from beginning
+ if (e.b == b.max_sz) e.b= 0; // start from beginning
}
}
e.bufferempty = 0;
@@ -1140,21 +1231,35 @@ NdbGlobalEventBuffer::real_insertDataL(int bufferId,
#endif
}
}
- return 0;
+ else
+ {
+#ifdef EVENT_DEBUG
+ ndbout_c("skipped");
+#endif
+ }
+
+ DBUG_RETURN(0);
}
int NdbGlobalEventBuffer::hasData(int bufferId) {
+ DBUG_ENTER("NdbGlobalEventBuffer::hasData");
BufItem &b = m_buf[ID(bufferId)];
int n = NO(bufferId);
NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
if(e.bufferempty)
- return 0;
+ {
+ DBUG_RETURN(0);
+ }
if (b.f <= e.b)
- return b.max_sz-e.b + b.f;
+ {
+ DBUG_RETURN(b.max_sz-e.b + b.f);
+ }
else
- return b.f-e.b;
+ {
+ DBUG_RETURN(b.f-e.b);
+ }
}
int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
@@ -1162,6 +1267,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
LinearSectionPtr ptr[3],
int *pOverrun)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_getDataL");
BufItem &b = m_buf[ID(bufferId)];
int n = NO(bufferId);
NdbGlobalEventBuffer::BufItem::Ps &e = b.ps[n];
@@ -1172,13 +1278,20 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
}
if (e.bufferempty)
- return 0; // nothing to get
+ {
+ DBUG_RETURN(0); // nothing to get
+ }
+
+ DBUG_PRINT("info",("ID(bufferId) %d NO(bufferId) %d e.b %d",
+ ID(bufferId), NO(bufferId), e.b));
if (copy_data_alloc(b.data[e.b].sdata, b.data[e.b].ptr,
sdata, ptr))
- return -1;
+ {
+ DBUG_RETURN(-1);
+ }
- e.b++; if (e.b == b.max_sz) e.b = 0; // move next-to-read forward
+ e.b++; if (e.b == b.max_sz) e.b= 0; // move next-to-read forward
if (b.f == e.b) // back has cought up with front
e.bufferempty = 1;
@@ -1187,7 +1300,7 @@ int NdbGlobalEventBuffer::real_getDataL(const int bufferId,
ndbout_c("getting data from buffer %d with eventId %d", bufferId, b.gId);
#endif
- return hasData(bufferId)+1;
+ DBUG_RETURN(hasData(bufferId)+1);
}
int
NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
@@ -1195,49 +1308,59 @@ NdbGlobalEventBuffer::copy_data_alloc(const SubTableData * const f_sdata,
SubTableData * &t_sdata,
LinearSectionPtr t_ptr[3])
{
- if (t_sdata == NULL) {
- t_sdata = (SubTableData *)NdbMem_Allocate(sizeof(SubTableData));
- }
+ DBUG_ENTER("NdbGlobalEventBuffer::copy_data_alloc");
+ unsigned sz4= (sizeof(SubTableData)+3)>>2;
+ Uint32 *ptr= (Uint32*)NdbMem_Allocate((sz4 +
+ f_ptr[0].sz +
+ f_ptr[1].sz +
+ f_ptr[2].sz) * sizeof(Uint32));
+ if (t_sdata)
+ NdbMem_Free((char*)t_sdata);
+ t_sdata= (SubTableData *)ptr;
memcpy(t_sdata,f_sdata,sizeof(SubTableData));
+ ptr+= sz4;
+
for (int i = 0; i < 3; i++) {
LinearSectionPtr & f_p = f_ptr[i];
LinearSectionPtr & t_p = t_ptr[i];
if (f_p.sz > 0) {
- if (t_p.p == NULL) {
- t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
- } else if (t_p.sz != f_p.sz) {
- NdbMem_Free(t_p.p);
- t_p.p = (Uint32 *)NdbMem_Allocate(sizeof(Uint32)*f_p.sz);
- }
+ t_p.p= (Uint32 *)ptr;
memcpy(t_p.p, f_p.p, sizeof(Uint32)*f_p.sz);
- } else if (t_p.p != NULL) {
- NdbMem_Free(t_p.p);
- t_p.p = NULL;
+ ptr+= f_p.sz;
+ t_p.sz= f_p.sz;
+ } else {
+ t_p.p= NULL;
+ t_p.sz= 0;
}
- t_p.sz = f_p.sz;
}
- return 0;
+ DBUG_RETURN(0);
}
int
NdbGlobalEventBuffer::real_wait(NdbGlobalEventBufferHandle *h,
int aMillisecondNumber)
{
+ DBUG_ENTER("NdbGlobalEventBuffer::real_wait");
// check if there are anything in any of the buffers
int i;
int n = 0;
for (i = 0; i < h->m_nids; i++)
n += hasData(h->m_bufferIds[i]);
- if (n) return n;
+ if (n)
+ {
+ DBUG_RETURN(n);
+ }
int r = NdbCondition_WaitTimeout(h->p_cond, ndb_global_event_buffer_mutex,
aMillisecondNumber);
if (r > 0)
- return -1;
+ {
+ DBUG_RETURN(-1);
+ }
n = 0;
for (i = 0; i < h->m_nids; i++)
n += hasData(h->m_bufferIds[i]);
- return n;
+ DBUG_RETURN(n);
}
template class Vector<NdbGlobalEventBufferHandle*>;