diff options
Diffstat (limited to 'src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc')
-rw-r--r-- | src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc index 67e41cec2..f520841a4 100644 --- a/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc +++ b/src/components/transport_manager/src/transport_adapter/threaded_socket_connection.cc @@ -30,6 +30,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include <algorithm> #include <errno.h> #include <fcntl.h> #include <memory.h> @@ -183,14 +184,23 @@ void ThreadedSocketConnection::threadMain() { } } +bool ThreadedSocketConnection::IsFramesToSendQueueEmpty() const { + // Check Frames queue is empty or not + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + return frames_to_send_.empty(); +} + void ThreadedSocketConnection::Transmit() { LOG4CXX_AUTO_TRACE(logger_); const nfds_t kPollFdsSize = 2; pollfd poll_fds[kPollFdsSize]; poll_fds[0].fd = socket_; + + const bool is_queue_empty_on_poll = IsFramesToSendQueueEmpty(); + poll_fds[0].events = POLLIN | POLLPRI - | (frames_to_send_.empty() ? 0 : POLLOUT); + | (is_queue_empty_on_poll ? 0 : POLLOUT); poll_fds[1].fd = read_fd_; poll_fds[1].events = POLLIN | POLLPRI; @@ -231,8 +241,10 @@ void ThreadedSocketConnection::Transmit() { return; } - // send data if possible - if (!frames_to_send_.empty() && (poll_fds[0].revents | POLLOUT)) { + const bool is_queue_empty = IsFramesToSendQueueEmpty(); + + // Send data if possible + if (!is_queue_empty && (poll_fds[0].revents | POLLOUT)) { LOG4CXX_DEBUG(logger_, "frames_to_send_ not empty() "); // send data @@ -288,15 +300,17 @@ bool ThreadedSocketConnection::Receive() { bool ThreadedSocketConnection::Send() { LOG4CXX_AUTO_TRACE(logger_); - FrameQueue frames_to_send; - frames_to_send_mutex_.Acquire(); - std::swap(frames_to_send, frames_to_send_); - frames_to_send_mutex_.Release(); + FrameQueue frames_to_send_local; + + { + sync_primitives::AutoLock auto_lock(frames_to_send_mutex_); + std::swap(frames_to_send_local, frames_to_send_); + } size_t offset = 0; - while (!frames_to_send.empty()) { + while (!frames_to_send_local.empty()) { LOG4CXX_INFO(logger_, "frames_to_send is not empty"); - ::protocol_handler::RawMessagePtr frame = frames_to_send.front(); + ::protocol_handler::RawMessagePtr frame = frames_to_send_local.front(); const ssize_t bytes_sent = ::send(socket_, frame->data() + offset, frame->data_size() - offset, 0); @@ -304,14 +318,14 @@ bool ThreadedSocketConnection::Send() { LOG4CXX_DEBUG(logger_, "bytes_sent >= 0"); offset += bytes_sent; if (offset == frame->data_size()) { - frames_to_send.pop(); + frames_to_send_local.pop(); offset = 0; controller_->DataSendDone(device_handle(), application_handle(), frame); } } else { LOG4CXX_DEBUG(logger_, "bytes_sent < 0"); LOG4CXX_ERROR_WITH_ERRNO(logger_, "Send failed for connection " << this); - frames_to_send.pop(); + frames_to_send_local.pop(); offset = 0; controller_->DataSendFailed(device_handle(), application_handle(), frame, DataSendError()); |