summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorClifford Jansen <cliffjansen@apache.org>2014-09-09 02:19:38 +0000
committerClifford Jansen <cliffjansen@apache.org>2014-09-09 02:19:38 +0000
commit6b86c37821372b9a66ba58db03a19b48433e2971 (patch)
treed05b6c42ffabc2e2f766cde91f34f69c613b7848 /qpid/cpp/src
parentb9c60a6c18b281c2e0e0b534365f5c7e78d41795 (diff)
downloadqpid-python-6b86c37821372b9a66ba58db03a19b48433e2971.tar.gz
QPID-5033: Bad buffer accounting leading to spurious SSL errors on Windows
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1623614 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIO.h5
-rw-r--r--qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp13
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp212
-rw-r--r--qpid/cpp/src/qpid/sys/windows/AsynchIO.h235
-rw-r--r--qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp90
5 files changed, 338 insertions, 217 deletions
diff --git a/qpid/cpp/src/qpid/sys/AsynchIO.h b/qpid/cpp/src/qpid/sys/AsynchIO.h
index 2bab96c0c5..09402e9e44 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIO.h
+++ b/qpid/cpp/src/qpid/sys/AsynchIO.h
@@ -144,10 +144,9 @@ public:
const static uint32_t MaxBufferSize = 65536;
/*
- * Number of IO buffers allocated - I think the code can only use 2 -
- * 1 for reading and 1 for writing, allocate 4 for safety
+ * Number of IO buffers allocated - 1 for reading and 1 for writing.
*/
- const static uint32_t BufferCount = 4;
+ const static uint32_t BufferCount = 2;
virtual void queueForDeletion() = 0;
diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 7418c93150..eebf02e867 100644
--- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -414,13 +414,14 @@ void AsynchIO::requestedCall(RequestCallback callback) {
* to spare
*/
AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
- // Always keep at least one buffer (it might have data that was "unread" in it)
- if (bufferQueue.size()<=1)
+ BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back();
+ // An "unread" buffer is reserved for future read operations (which
+ // take from the front of the queue).
+ if (!buff || (buff->dataCount && bufferQueue.size() == 1)) {
+ QPID_LOG(error, "No IO buffers available");
return 0;
- BufferBase* buff = bufferQueue.back();
- assert(buff);
- buff->dataStart = 0;
- buff->dataCount = 0;
+ }
+ assert(buff->dataCount == 0);
bufferQueue.pop_back();
return buff;
}
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
index ac17a1757d..d65aad1304 100644
--- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -44,6 +44,7 @@
#include <boost/bind.hpp>
#include <boost/shared_array.hpp>
+#include "qpid/sys/windows/AsynchIO.h"
namespace {
@@ -82,24 +83,6 @@ namespace windows {
* Asynch Acceptor
*
*/
-class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
-
- friend class AsynchAcceptResult;
-
-public:
- AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
- ~AsynchAcceptor();
- void start(Poller::shared_ptr poller);
-
-private:
- void restart(void);
-
- AsynchAcceptor::Callback acceptedCallback;
- const Socket& socket;
- const SOCKET wSocket;
- const LPFN_ACCEPTEX fnAcceptEx;
-};
-
AsynchAcceptor::AsynchAcceptor(const Socket& s, Callback callback)
: acceptedCallback(callback),
socket(s),
@@ -184,24 +167,6 @@ void AsynchAcceptResult::failure(int /*status*/) {
* event handle to associate with the connecting handle. But there's no
* time for that right now...
*/
-class AsynchConnector : public qpid::sys::AsynchConnector {
-private:
- ConnectedCallback connCallback;
- FailedCallback failCallback;
- const Socket& socket;
- const std::string hostname;
- const std::string port;
-
-public:
- AsynchConnector(const Socket& socket,
- const std::string& hostname,
- const std::string& port,
- ConnectedCallback connCb,
- FailedCallback failCb = 0);
- void start(Poller::shared_ptr poller);
- void requestCallback(RequestCallback rCb);
-};
-
AsynchConnector::AsynchConnector(const Socket& sock,
const std::string& hname,
const std::string& p,
@@ -260,136 +225,6 @@ AsynchConnector* qpid::sys::AsynchConnector::create(const Socket& s,
namespace windows {
-class AsynchIO : public qpid::sys::AsynchIO {
-public:
- AsynchIO(const Socket& s,
- ReadCallback rCb,
- EofCallback eofCb,
- DisconnectCallback disCb,
- ClosedCallback cCb = 0,
- BuffersEmptyCallback eCb = 0,
- IdleCallback iCb = 0);
- ~AsynchIO();
-
- // Methods inherited from qpid::sys::AsynchIO
-
- /**
- * Notify the object is should delete itself as soon as possible.
- */
- virtual void queueForDeletion();
-
- /// Take any actions needed to prepare for working with the poller.
- virtual void start(Poller::shared_ptr poller);
- virtual void createBuffers(uint32_t size);
- virtual void queueReadBuffer(BufferBase* buff);
- virtual void unread(BufferBase* buff);
- virtual void queueWrite(BufferBase* buff);
- virtual void notifyPendingWrite();
- virtual void queueWriteClose();
- virtual bool writeQueueEmpty();
- virtual void requestCallback(RequestCallback);
-
- /**
- * getQueuedBuffer returns a buffer from the buffer queue, if one is
- * available.
- *
- * @retval Pointer to BufferBase buffer; 0 if none is available.
- */
- virtual BufferBase* getQueuedBuffer();
-
- virtual SecuritySettings getSecuritySettings(void);
-
-private:
- ReadCallback readCallback;
- EofCallback eofCallback;
- DisconnectCallback disCallback;
- ClosedCallback closedCallback;
- BuffersEmptyCallback emptyCallback;
- IdleCallback idleCallback;
- const Socket& socket;
- Poller::shared_ptr poller;
-
- std::deque<BufferBase*> bufferQueue;
- std::deque<BufferBase*> writeQueue;
- /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
- * access to the buffer queue and write queue.
- */
- Mutex bufferQueueLock;
- std::vector<BufferBase> buffers;
- boost::shared_array<char> bufferMemory;
-
- // Number of outstanding I/O operations.
- volatile LONG opsInProgress;
- // Is there a write in progress?
- volatile bool writeInProgress;
- // Or a read?
- volatile bool readInProgress;
- // Deletion requested, but there are callbacks in progress.
- volatile bool queuedDelete;
- // Socket close requested, but there are operations in progress.
- volatile bool queuedClose;
-
-private:
- // Dispatch events that have completed.
- void notifyEof(void);
- void notifyDisconnect(void);
- void notifyClosed(void);
- void notifyBuffersEmpty(void);
- void notifyIdle(void);
-
- /**
- * Initiate a write of the specified buffer. There's no callback for
- * write completion to the AsynchIO object.
- */
- void startWrite(AsynchIO::BufferBase* buff);
-
- void close(void);
-
- /**
- * startReading initiates reading, readComplete() is
- * called when the read completes.
- */
- void startReading();
-
- /**
- * readComplete is called when a read request is complete.
- *
- * @param result Results of the operation.
- */
- void readComplete(AsynchReadResult *result);
-
- /**
- * writeComplete is called when a write request is complete.
- *
- * @param result Results of the operation.
- */
- void writeComplete(AsynchWriteResult *result);
-
- /**
- * Queue of completions to run. This queue enforces the requirement
- * from upper layers that only one thread at a time is allowed to act
- * on any given connection. Once a thread is busy processing a completion
- * on this object, other threads that dispatch completions queue the
- * completions here for the in-progress thread to handle when done.
- * Thus, any threads can dispatch a completion from the IocpPoller, but
- * this class ensures that actual processing at the connection level is
- * only on one thread at a time.
- */
- std::queue<AsynchIoResult *> completionQueue;
- volatile bool working;
- Mutex completionLock;
-
- /**
- * Called when there's a completion to process.
- */
- void completion(AsynchIoResult *result);
-
- /**
- * Helper function to facilitate the close operation
- */
- void cancelRead();
-};
-
// This is used to encapsulate pure callbacks into a handle
class CallbackHandle : public IOHandle {
public:
@@ -414,6 +249,7 @@ AsynchIO::AsynchIO(const Socket& s,
emptyCallback(eCb),
idleCallback(iCb),
socket(s),
+ bufferCount(BufferCount),
opsInProgress(0),
writeInProgress(false),
readInProgress(false),
@@ -455,14 +291,19 @@ void AsynchIO::start(Poller::shared_ptr poller0) {
startReading();
}
+uint32_t AsynchIO::getBufferCount(void) { return bufferCount; }
+
+void AsynchIO::setBufferCount(uint32_t count) { bufferCount = count; }
+
+
void AsynchIO::createBuffers(uint32_t size) {
// Allocate all the buffer memory at once
- bufferMemory.reset(new char[size*BufferCount]);
+ bufferMemory.reset(new char[size*bufferCount]);
// Create the Buffer structs in a vector
// And push into the buffer queue
- buffers.reserve(BufferCount);
- for (uint32_t i = 0; i < BufferCount; i++) {
+ buffers.reserve(bufferCount);
+ for (uint32_t i = 0; i < bufferCount; i++) {
buffers.push_back(BufferBase(&bufferMemory[i*size], size));
queueReadBuffer(&buffers[i]);
}
@@ -538,6 +379,9 @@ void AsynchIO::startReading() {
assert(buff);
bufferQueue.pop_front();
}
+ else {
+ logNoBuffers("startReading");
+ }
}
if (buff != 0) {
int readCount = buff->byteCount - buff->dataCount;
@@ -590,12 +434,17 @@ void AsynchIO::requestCallback(RequestCallback callback) {
*/
AsynchIO::BufferBase* AsynchIO::getQueuedBuffer() {
QLock l(bufferQueueLock);
- // Always keep at least one buffer (it might have data that was
- // "unread" in it).
- if (bufferQueue.size() <= 1)
+ BufferBase* buff = bufferQueue.empty() ? 0 : bufferQueue.back();
+ // An "unread" buffer is reserved for future read operations (which
+ // take from the front of the queue).
+ if (!buff || (buff->dataCount && bufferQueue.size() == 1)) {
+ if (buff)
+ logNoBuffers("getQueuedBuffer with unread data");
+ else
+ logNoBuffers("getQueuedBuffer with empty queue");
return 0;
- BufferBase* buff = bufferQueue.back();
- assert(buff);
+ }
+ assert(buff->dataCount == 0);
bufferQueue.pop_back();
return buff;
}
@@ -833,6 +682,21 @@ void AsynchIO::cancelRead() {
socket.close();
}
+/*
+ * Track down cause of unavailable buffer if it recurs: QPID-5033
+ */
+void AsynchIO::logNoBuffers(const char *context) {
+ QPID_LOG(error, "No IO buffers available: " << context <<
+ ". Debug data: " << bufferQueue.size() <<
+ ' ' << writeQueue.size() <<
+ ' ' << completionQueue.size() <<
+ ' ' << opsInProgress <<
+ ' ' << writeInProgress <<
+ ' ' << readInProgress <<
+ ' ' << working);
+}
+
+
} // namespace windows
AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.h b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h
new file mode 100644
index 0000000000..a50864b561
--- /dev/null
+++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.h
@@ -0,0 +1,235 @@
+#ifndef _sys_windows_AsynchIO
+#define _sys_windows_AsynchIO
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIoResult.h"
+#include "qpid/sys/AsynchIO.h"
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/CommonImportExport.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/shared_array.hpp>
+#include <winsock2.h>
+#include <mswsock.h>
+#include <windows.h>
+
+// security.h needs to see this to distinguish from kernel use.
+#define SECURITY_WIN32
+#include <security.h>
+#include <Schnlsp.h>
+#undef SECURITY_WIN32
+
+namespace qpid {
+namespace sys {
+namespace windows {
+
+/*
+ * Asynch Acceptor
+ */
+
+class AsynchAcceptor : public qpid::sys::AsynchAcceptor {
+
+ friend class AsynchAcceptResult;
+
+public:
+ AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback);
+ ~AsynchAcceptor();
+ void start(Poller::shared_ptr poller);
+
+private:
+ void restart(void);
+
+ AsynchAcceptor::Callback acceptedCallback;
+ const Socket& socket;
+ const SOCKET wSocket;
+ const LPFN_ACCEPTEX fnAcceptEx;
+};
+
+
+class AsynchConnector : public qpid::sys::AsynchConnector {
+private:
+ ConnectedCallback connCallback;
+ FailedCallback failCallback;
+ const Socket& socket;
+ const std::string hostname;
+ const std::string port;
+
+public:
+ AsynchConnector(const Socket& socket,
+ const std::string& hostname,
+ const std::string& port,
+ ConnectedCallback connCb,
+ FailedCallback failCb = 0);
+ void start(Poller::shared_ptr poller);
+ void requestCallback(RequestCallback rCb);
+};
+
+class AsynchIO : public qpid::sys::AsynchIO {
+
+ friend class SslAsynchIO;
+
+public:
+ AsynchIO(const Socket& s,
+ ReadCallback rCb,
+ EofCallback eofCb,
+ DisconnectCallback disCb,
+ ClosedCallback cCb = 0,
+ BuffersEmptyCallback eCb = 0,
+ IdleCallback iCb = 0);
+ ~AsynchIO();
+
+ // Methods inherited from qpid::sys::AsynchIO
+
+ /**
+ * Notify the object is should delete itself as soon as possible.
+ */
+ virtual void queueForDeletion();
+
+ /// Take any actions needed to prepare for working with the poller.
+ virtual void start(Poller::shared_ptr poller);
+ virtual void createBuffers(uint32_t size);
+ virtual void queueReadBuffer(BufferBase* buff);
+ virtual void unread(BufferBase* buff);
+ virtual void queueWrite(BufferBase* buff);
+ virtual void notifyPendingWrite();
+ virtual void queueWriteClose();
+ virtual bool writeQueueEmpty();
+ virtual void requestCallback(RequestCallback);
+
+ /**
+ * getQueuedBuffer returns a buffer from the buffer queue, if one is
+ * available.
+ *
+ * @retval Pointer to BufferBase buffer; 0 if none is available.
+ */
+ virtual BufferBase* getQueuedBuffer();
+
+ virtual SecuritySettings getSecuritySettings(void);
+
+private:
+ ReadCallback readCallback;
+ EofCallback eofCallback;
+ DisconnectCallback disCallback;
+ ClosedCallback closedCallback;
+ BuffersEmptyCallback emptyCallback;
+ IdleCallback idleCallback;
+ const Socket& socket;
+ Poller::shared_ptr poller;
+ uint32_t bufferCount;
+
+ std::deque<BufferBase*> bufferQueue;
+ std::deque<BufferBase*> writeQueue;
+ /* The MSVC-supplied deque is not thread-safe; keep locks to serialize
+ * access to the buffer queue and write queue.
+ */
+ Mutex bufferQueueLock;
+ std::vector<BufferBase> buffers;
+ boost::shared_array<char> bufferMemory;
+
+ // Number of outstanding I/O operations.
+ volatile LONG opsInProgress;
+ // Is there a write in progress?
+ volatile bool writeInProgress;
+ // Or a read?
+ volatile bool readInProgress;
+ // Deletion requested, but there are callbacks in progress.
+ volatile bool queuedDelete;
+ // Socket close requested, but there are operations in progress.
+ volatile bool queuedClose;
+
+protected:
+ uint32_t getBufferCount(void);
+ void setBufferCount(uint32_t);
+
+private:
+ // Dispatch events that have completed.
+ void notifyEof(void);
+ void notifyDisconnect(void);
+ void notifyClosed(void);
+ void notifyBuffersEmpty(void);
+ void notifyIdle(void);
+
+ /**
+ * Initiate a write of the specified buffer. There's no callback for
+ * write completion to the AsynchIO object.
+ */
+ void startWrite(AsynchIO::BufferBase* buff);
+
+ void close(void);
+
+ /**
+ * startReading initiates reading, readComplete() is
+ * called when the read completes.
+ */
+ void startReading();
+
+ /**
+ * readComplete is called when a read request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void readComplete(AsynchReadResult *result);
+
+ /**
+ * writeComplete is called when a write request is complete.
+ *
+ * @param result Results of the operation.
+ */
+ void writeComplete(AsynchWriteResult *result);
+
+ /**
+ * Queue of completions to run. This queue enforces the requirement
+ * from upper layers that only one thread at a time is allowed to act
+ * on any given connection. Once a thread is busy processing a completion
+ * on this object, other threads that dispatch completions queue the
+ * completions here for the in-progress thread to handle when done.
+ * Thus, any threads can dispatch a completion from the IocpPoller, but
+ * this class ensures that actual processing at the connection level is
+ * only on one thread at a time.
+ */
+ std::queue<AsynchIoResult *> completionQueue;
+ volatile bool working;
+ Mutex completionLock;
+
+ /**
+ * Called when there's a completion to process.
+ */
+ void completion(AsynchIoResult *result);
+
+ /**
+ * Helper function to facilitate the close operation
+ */
+ void cancelRead();
+
+ /**
+ * Log information about buffer depletion, which should never happen.
+ * See QPID-5033.
+ */
+ void logNoBuffers(const char*);
+};
+
+}}} // namespace qpid::sys::windows
+
+#endif // _sys_windows_AsynchIO
diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
index 28abc743f4..079627372b 100644
--- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp
@@ -38,6 +38,7 @@
#include <queue>
#include <boost/bind.hpp>
+#include "AsynchIO.h"
namespace qpid {
namespace sys {
@@ -143,6 +144,9 @@ void SslAsynchIO::start(qpid::sys::Poller::shared_ptr poller) {
}
void SslAsynchIO::createBuffers(uint32_t size) {
+ // Reserve an extra buffer to hold unread plaintext or trailing encrypted input.
+ windows::AsynchIO *waio = dynamic_cast<windows::AsynchIO*>(aio);
+ waio->setBufferCount(waio->getBufferCount() + 1);
aio->createBuffers(size);
}
@@ -298,7 +302,7 @@ void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
return;
}
- // Decrypt the buffer; if there's legit data, pass it on through.
+ // Decrypt one block; if there's legit data, pass it on through.
// However, it's also possible that the peer hasn't supplied enough
// data yet, or the session needs to be renegotiated, or the session
// is ending.
@@ -344,6 +348,8 @@ void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
// actual decrypted data.
// If there's extra data, copy that out to a new buffer and run through
// this method again.
+ char *extraBytes = 0;
+ int32_t extraLength = 0;
BufferBase *extraBuff = 0;
for (int i = 0; i < 4; i++) {
switch (recvBuffs[i].BufferType) {
@@ -354,18 +360,8 @@ void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
buff->dataCount -= recvBuffs[i].cbBuffer;
break;
case SECBUFFER_EXTRA:
- // Very important to get this buffer from the downstream aio.
- // The ones constructed from the local getQueuedBuffer() are
- // restricted size for encrypting. However, data coming up from
- // TCP may have a bunch of SSL segments coalesced and be much
- // larger than the maximum single SSL segment.
- extraBuff = a.getQueuedBuffer();
- if (0 == extraBuff)
- throw QPID_WINDOWS_ERROR(WSAENOBUFS);
- memmove(extraBuff->bytes,
- recvBuffs[i].pvBuffer,
- recvBuffs[i].cbBuffer);
- extraBuff->dataCount = recvBuffs[i].cbBuffer;
+ extraBytes = (char *) recvBuffs[i].pvBuffer;
+ extraLength = recvBuffs[i].cbBuffer;
break;
default:
break;
@@ -381,34 +377,59 @@ void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
// (so we can count on more bytes being on the way via aio).
do {
BufferBase *temp = 0;
- // Now that buff reflects only decrypted data, see if there was any
- // partial data left over from last time. If so, append this new
+ // See if there was partial data left over from last time. If so, append this new
// data to that and release the current buff back to aio. Assume that
// leftoverPlaintext was squished so the data starts at 0.
if (leftoverPlaintext != 0) {
// There is leftover data; append all the new data that will fit.
int32_t count = buff->dataCount;
- if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount)
- count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount);
- ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount],
- &buff->bytes[buff->dataStart],
- count);
- leftoverPlaintext->dataCount += count;
- buff->dataCount -= count;
- buff->dataStart += count;
- if (buff->dataCount == 0) {
- a.queueReadBuffer(buff);
- buff = 0;
+ if (count) {
+ if (leftoverPlaintext->dataCount + count > leftoverPlaintext->byteCount)
+ count = (leftoverPlaintext->byteCount - leftoverPlaintext->dataCount);
+ ::memmove(&leftoverPlaintext->bytes[leftoverPlaintext->dataCount],
+ &buff->bytes[buff->dataStart], count);
+ leftoverPlaintext->dataCount += count;
+ buff->dataCount -= count;
+ buff->dataStart += count;
+ // Prepare to pass the buffer up. Beware that the read callback
+ // may do an unread(), so move the leftoverPlaintext pointer
+ // out of the way. It also may release the buffer back to aio,
+ // so in either event, the pointer passed to the callback is not
+ // valid on return.
+ temp = leftoverPlaintext;
+ leftoverPlaintext = 0;
+ }
+ else {
+ // All decrypted data used up, decrypt some more or get more from the aio
+ if (extraLength) {
+ buff->dataStart = extraBytes - buff->bytes;
+ buff->dataCount = extraLength;
+ sslDataIn(a, buff);
+ return;
+ }
+ else {
+ a.queueReadBuffer(buff);
+ return;
+ }
}
- // Prepare to pass the buffer up. Beware that the read callback
- // may do an unread(), so move the leftoverPlaintext pointer
- // out of the way. It also may release the buffer back to aio,
- // so in either event, the pointer passed to the callback is not
- // valid on return.
- temp = leftoverPlaintext;
- leftoverPlaintext = 0;
}
else {
+ // Use buff, but first offload data not yet encrypted
+ if (extraLength) {
+ // Very important to get this buffer from the downstream aio.
+ // The ones constructed from the local getQueuedBuffer() are
+ // restricted size for encrypting. However, data coming up from
+ // TCP may have a bunch of SSL segments coalesced and be much
+ // larger than the maximum single SSL segment.
+ extraBuff = a.getQueuedBuffer();
+ if (0 == extraBuff) {
+ // No leftoverPlaintext, so a spare buffer should be available
+ throw QPID_WINDOWS_ERROR(WSAENOBUFS);
+ }
+ memmove(extraBuff->bytes, extraBytes, extraLength);
+ extraBuff->dataCount = extraLength;
+ extraLength = 0;
+ }
temp = buff;
buff = 0;
}
@@ -423,8 +444,9 @@ void SslAsynchIO::sslDataIn(qpid::sys::AsynchIO& a, BufferBase *buff) {
// Ok, the current decrypted data is done. If there was any extra data,
// go back and handle that.
- if (extraBuff != 0)
+ if (extraBuff != 0) {
sslDataIn(a, extraBuff);
+ }
}
void SslAsynchIO::idle(qpid::sys::AsynchIO&) {