diff options
Diffstat (limited to 'network.cpp')
-rw-r--r-- | network.cpp | 159 |
1 files changed, 92 insertions, 67 deletions
diff --git a/network.cpp b/network.cpp index 72001d4e..f7705a70 100644 --- a/network.cpp +++ b/network.cpp @@ -2,6 +2,7 @@ #include "pch.h" #include "network.h" +#include "wait.h" NAMESPACE_BEGIN(CryptoPP) @@ -33,10 +34,24 @@ bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking) #ifdef HIGHRES_TIMER_AVAILABLE NetworkSource::NetworkSource(BufferedTransformation *attachment) - : NonblockingSource(attachment), m_buf(1024*4), m_bufSize(0), m_state(NORMAL) + : NonblockingSource(attachment), m_buf(1024*16) + , m_waitingForResult(false), m_outputBlocked(false) + , m_dataBegin(0), m_dataEnd(0) { } +void NetworkSource::GetWaitObjects(WaitObjectContainer &container) +{ + if (!m_outputBlocked) + { + if (m_dataBegin == m_dataEnd) + AccessReceiver().GetWaitObjects(container); + else + container.SetNoWait(); + } + AttachedTransformation()->GetWaitObjects(container); +} + unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) { NetworkReceiver &receiver = AccessReceiver(); @@ -45,80 +60,93 @@ unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blocking byteCount = 0; bool forever = maxTime == INFINITE_TIME; Timer timer(Timer::MILLISECONDS, forever); - unsigned long timeout; BufferedTransformation *t = AttachedTransformation(); - if (m_state == OUTPUT_BLOCKED) + if (m_outputBlocked) goto DoOutput; while (true) { - if (m_state == WAITING_FOR_RESULT) + if (m_dataBegin == m_dataEnd) { - if (receiver.MustWaitForResult()) + if (receiver.EofReceived()) + break; + + if (m_waitingForResult) { - timeout = SaturatingSubtract(maxTime, timer.ElapsedTime()); - if (!receiver.Wait(timeout)) + if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) break; + + unsigned int recvResult = receiver.GetReceiveResult(); + m_dataEnd += recvResult; + m_waitingForResult = false; + + if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size()) + goto ReceiveNoWait; } + else + { + m_dataEnd = m_dataBegin = 0; - unsigned int recvResult = receiver.GetReceiveResult(); -// assert(recvResult > 0 || receiver.EofReceived()); - m_bufSize += recvResult; - m_state = NORMAL; - } + if (receiver.MustWaitToReceive()) + { + if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + break; - if (m_bufSize == 0) - { - if (receiver.EofReceived()) - break; + receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd); + m_waitingForResult = true; + } + else + { +ReceiveNoWait: + m_waitingForResult = true; + // call Receive repeatedly as long as data is immediately available, + // because some receivers tend to return data in small pieces + while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd)) + { + unsigned int recvResult = receiver.GetReceiveResult(); + m_dataEnd += recvResult; + if (receiver.EofReceived() || m_dataEnd == m_buf.size()) + { + m_waitingForResult = false; + break; + } + } + } + } } else { - m_putSize = STDMIN((unsigned long)m_bufSize, maxSize - byteCount); + m_putSize = STDMIN((unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount); if (checkDelimiter) - m_putSize = std::find(m_buf.begin(), m_buf+m_putSize, delimiter) - m_buf; + m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin); DoOutput: - unsigned int result = t->PutModifiable2(m_buf, m_putSize, 0, forever || blockingOutput); + unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput); if (result) { - timeout = SaturatingSubtract(maxTime, timer.ElapsedTime()); - if (t->Wait(timeout)) + if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) goto DoOutput; else { - m_state = OUTPUT_BLOCKED; + m_outputBlocked = true; return result; } } - m_state = NORMAL; + m_outputBlocked = false; byteCount += m_putSize; - m_bufSize -= m_putSize; - if (m_bufSize > 0) - { - memmove(m_buf, m_buf+m_putSize, m_bufSize); - if (checkDelimiter && m_buf[0] == delimiter) - break; - } - } - - if (byteCount == maxSize) - break; - - unsigned long elapsed = timer.ElapsedTime(); - if (elapsed > maxTime) - break; // once time limit is reached, return even if there is more data waiting - - if (receiver.MustWaitToReceive()) - { - if (!receiver.Wait(maxTime - elapsed)) + m_dataBegin += m_putSize; + if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter) + break; + if (byteCount == maxSize) + break; + // once time limit is reached, return even if there is more data waiting + // but make 0 a special case so caller can request a large amount of data to be + // pumped as long as it is immediately available + if (maxTime > 0 && timer.ElapsedTime() > maxTime) break; } - - receiver.Receive(m_buf+m_bufSize, m_buf.size()-m_bufSize); - m_state = WAITING_FOR_RESULT; } return 0; @@ -126,6 +154,12 @@ DoOutput: // ************************************************************* +NetworkSink::NetworkSink(unsigned int maxBufferSize, bool autoFlush) + : m_maxBufferSize(maxBufferSize), m_autoFlush(autoFlush) + , m_needSendResult(false), m_buffer(STDMIN(16U*1024U, maxBufferSize)), m_blockedBytes(0) +{ +} + unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking) { if (m_blockedBytes) @@ -134,7 +168,7 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me inString += length - m_blockedBytes; length = m_blockedBytes; } - m_buffer.LazyPut(inString, length); + LazyPutter lp(m_buffer, inString, length); unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize; TimedFlush(blocking ? INFINITE_TIME : 0, m_autoFlush ? 0 : targetSize); @@ -144,7 +178,6 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me assert(!blocking); m_blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length); m_buffer.UndoLazyPut(m_blockedBytes); - m_buffer.FinalizeLazyPut(); return STDMAX(m_blockedBytes, 1U); } m_blockedBytes = 0; @@ -156,51 +189,43 @@ unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int me unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetSize) { - if (m_buffer.IsEmpty()) - return 0; - NetworkSender &sender = AccessSender(); bool forever = maxTime == INFINITE_TIME; Timer timer(Timer::MILLISECONDS, forever); - unsigned long timeout; unsigned int totalFlushSize = 0; while (true) { + if (m_buffer.CurrentSize() <= targetSize) + break; + if (m_needSendResult) { - if (sender.MustWaitForResult()) - { - timeout = SaturatingSubtract(maxTime, timer.ElapsedTime()); - if (!sender.Wait(timeout)) - break; - } + if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) + break; unsigned int sendResult = sender.GetSendResult(); m_buffer.Skip(sendResult); totalFlushSize += sendResult; m_needSendResult = false; - if (m_buffer.CurrentSize() <= targetSize) + if (!m_buffer.AnyRetrievable()) break; } - unsigned long elapsed = timer.ElapsedTime(); - if (elapsed > maxTime) - break; // once time limit is reached, return even if there is more data waiting - - if (sender.MustWaitToSend()) - { - if (!sender.Wait(maxTime - elapsed)) - break; - } + unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; + if (sender.MustWaitToSend() && !sender.Wait(timeOut)) + break; unsigned int contiguousSize = 0; const byte *block = m_buffer.Spy(contiguousSize); sender.Send(block, contiguousSize); m_needSendResult = true; + + if (maxTime > 0 && timeOut == 0) + break; // once time limit is reached, return even if there is more data waiting } return totalFlushSize; |