// Copyright (C) 2013-2020 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #ifdef _WIN32 #include #include #else #include #include #include #endif #include #include #include #include #include namespace CommonAPI { namespace DBus { DBusDispatchSource::DBusDispatchSource(std::weak_ptr dbusConnection): dbusConnection_(dbusConnection) { } DBusDispatchSource::~DBusDispatchSource() { } bool DBusDispatchSource::prepare(int64_t &_timeout) { _timeout = -1; if(auto itsConnection = dbusConnection_.lock()) { return itsConnection->isDispatchReady(); } return false; } bool DBusDispatchSource::check() { if(auto itsConnection = dbusConnection_.lock()) { return itsConnection->isDispatchReady(); } return false; } bool DBusDispatchSource::dispatch() { if(auto itsConnection = dbusConnection_.lock()) { return itsConnection->singleDispatch(); } return false; } DBusQueueDispatchSource::DBusQueueDispatchSource(DBusQueueWatch* watch) : watch_(watch) { watch_->addDependentDispatchSource(this); } DBusQueueDispatchSource::~DBusQueueDispatchSource() { watch_->removeDependentDispatchSource(this); } bool DBusQueueDispatchSource::prepare(int64_t& timeout) { timeout = -1; return !watch_->emptyQueue(); } bool DBusQueueDispatchSource::check() { return !watch_->emptyQueue(); } bool DBusQueueDispatchSource::dispatch() { if (auto queueEntry = watch_->frontQueue()) { watch_->popQueue(); watch_->processQueueEntry(queueEntry); } return !watch_->emptyQueue(); } DBusWatch::DBusWatch(::DBusWatch* libdbusWatch, std::weak_ptr& mainLoopContext, std::weak_ptr& dbusConnection): libdbusWatch_(libdbusWatch), mainLoopContext_(mainLoopContext), dbusConnection_(dbusConnection) { if (NULL == libdbusWatch_) { COMMONAPI_ERROR(std::string(__FUNCTION__) + " libdbusWatch_ == NULL"); } } bool DBusWatch::isReadyToBeWatched() { return 0 != dbus_watch_get_enabled(libdbusWatch_); } void DBusWatch::startWatching() { if(!dbus_watch_get_enabled(libdbusWatch_)) stopWatching(); unsigned int channelFlags_ = dbus_watch_get_flags(libdbusWatch_); short int pollFlags = 0; if(channelFlags_ & DBUS_WATCH_READABLE) { pollFlags |= POLLIN; } if(channelFlags_ & DBUS_WATCH_WRITABLE) { pollFlags |= POLLOUT; } #ifdef _WIN32 pollFileDescriptor_.fd = dbus_watch_get_socket(libdbusWatch_); wsaEvent_ = WSACreateEvent(); WSAEventSelect(pollFileDescriptor_.fd, wsaEvent_, FD_READ); #else pollFileDescriptor_.fd = dbus_watch_get_unix_fd(libdbusWatch_); #endif pollFileDescriptor_.events = pollFlags; pollFileDescriptor_.revents = 0; auto lockedContext = mainLoopContext_.lock(); if (NULL == lockedContext) { COMMONAPI_ERROR(std::string(__FUNCTION__) + " lockedContext == NULL"); } else { lockedContext->registerWatch(this); } } void DBusWatch::stopWatching() { auto lockedContext = mainLoopContext_.lock(); if (lockedContext) { lockedContext->deregisterWatch(this); } } const pollfd& DBusWatch::getAssociatedFileDescriptor() { return pollFileDescriptor_; } #ifdef _WIN32 const HANDLE& DBusWatch::getAssociatedEvent() { return wsaEvent_; } #endif void DBusWatch::dispatch(unsigned int eventFlags) { #ifdef _WIN32 unsigned int dbusWatchFlags = 0; if (eventFlags & (POLLRDBAND | POLLRDNORM)) { dbusWatchFlags |= DBUS_WATCH_READABLE; } if (eventFlags & POLLWRNORM) { dbusWatchFlags |= DBUS_WATCH_WRITABLE; } if (eventFlags & (POLLERR | POLLNVAL)) { dbusWatchFlags |= DBUS_WATCH_ERROR; } if (eventFlags & POLLHUP) { dbusWatchFlags |= DBUS_WATCH_HANGUP; } #else // Pollflags do not correspond directly to DBus watch flags unsigned int dbusWatchFlags = (eventFlags & POLLIN) | ((eventFlags & POLLOUT) >> 1) | ((eventFlags & POLLERR) >> 1) | ((eventFlags & POLLHUP) >> 1); #endif std::shared_ptr itsConnection = dbusConnection_.lock(); if(itsConnection) { if(itsConnection->setDispatching(true)) { dbus_bool_t response = dbus_watch_handle(libdbusWatch_, dbusWatchFlags); if (!response) { printf("dbus_watch_handle returned FALSE!"); } itsConnection->setDispatching(false); } } } const std::vector& DBusWatch::getDependentDispatchSources() { std::lock_guard itsLock(dependentDispatchSourcesMutex_); return dependentDispatchSources_; } void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) { std::lock_guard itsLock(dependentDispatchSourcesMutex_); dependentDispatchSources_.push_back(dispatchSource); } DBusQueueWatch::DBusQueueWatch(std::shared_ptr _connection) : #ifdef _WIN32 pipeValue_(4) #else eventFd_(0), eventFdValue_(1) #endif { #ifdef _WIN32 WSADATA wsaData; int iResult; SOCKET ListenSocket = INVALID_SOCKET; struct addrinfo *result = NULL; struct addrinfo hints; // Initialize Winsock iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { printf("WSAStartup failed with error: %d\n", iResult); } ZeroMemory(&hints, sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE; // Resolve the server address and port iResult = getaddrinfo(NULL, "0", &hints, &result); if (iResult != 0) { printf("getaddrinfo failed with error: %d\n", iResult); WSACleanup(); } // Create a SOCKET for connecting to server ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); if (ListenSocket == INVALID_SOCKET) { printf("socket failed with error: %ld\n", WSAGetLastError()); freeaddrinfo(result); WSACleanup(); } // Setup the TCP listening socket iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); if (iResult == SOCKET_ERROR) { printf("bind failed with error: %d\n", WSAGetLastError()); freeaddrinfo(result); closesocket(ListenSocket); WSACleanup(); } sockaddr* connected_addr = new sockaddr(); USHORT port = 0; int namelength = sizeof(sockaddr); iResult = getsockname(ListenSocket, connected_addr, &namelength); if (iResult == SOCKET_ERROR) { printf("getsockname failed with error: %d\n", WSAGetLastError()); } else if (connected_addr->sa_family == AF_INET) { port = ((struct sockaddr_in*)connected_addr)->sin_port; } delete connected_addr; freeaddrinfo(result); iResult = listen(ListenSocket, SOMAXCONN); if (iResult == SOCKET_ERROR) { printf("listen failed with error: %d\n", WSAGetLastError()); closesocket(ListenSocket); WSACleanup(); } wsaData; pipeFileDescriptors_[0] = INVALID_SOCKET; struct addrinfo *ptr = NULL; // Initialize Winsock iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { printf("WSAStartup failed with error: %d\n", iResult); } ZeroMemory(&hints, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; // Resolve the server address and port iResult = getaddrinfo("127.0.0.1", std::to_string(ntohs(port)).c_str(), &hints, &result); if (iResult != 0) { printf("getaddrinfo failed with error: %d\n", iResult); WSACleanup(); } // Attempt to connect to an address until one succeeds for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { // Create a SOCKET for connecting to server pipeFileDescriptors_[0] = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol); if (pipeFileDescriptors_[0] == INVALID_SOCKET) { printf("socket failed with error: %ld\n", WSAGetLastError()); WSACleanup(); } // Connect to server. iResult = connect(pipeFileDescriptors_[0], ptr->ai_addr, (int)ptr->ai_addrlen); if (iResult == SOCKET_ERROR) { printf("connect failed with error: %ld\n", WSAGetLastError()); closesocket(pipeFileDescriptors_[0]); pipeFileDescriptors_[0] = INVALID_SOCKET; continue; } break; } freeaddrinfo(result); if (pipeFileDescriptors_[0] == INVALID_SOCKET) { printf("Unable to connect to server!\n"); WSACleanup(); } // Accept a client socket pipeFileDescriptors_[1] = accept(ListenSocket, NULL, NULL); if (pipeFileDescriptors_[1] == INVALID_SOCKET) { printf("accept failed with error: %d\n", WSAGetLastError()); closesocket(ListenSocket); WSACleanup(); } pollFileDescriptor_.fd = pipeFileDescriptors_[0]; #else eventFd_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE); if (eventFd_ == -1) { std::perror(__func__); } pollFileDescriptor_.fd = eventFd_; #endif pollFileDescriptor_.events = POLLIN; connection_ = _connection; } DBusQueueWatch::~DBusQueueWatch() { #ifdef _WIN32 // shutdown the connection since no more data will be sent int iResult = shutdown(pipeFileDescriptors_[0], SD_SEND); if (iResult == SOCKET_ERROR) { printf("shutdown failed with error: %d\n", WSAGetLastError()); closesocket(pipeFileDescriptors_[0]); WSACleanup(); } // cleanup closesocket(pipeFileDescriptors_[0]); WSACleanup(); #else close(eventFd_); #endif std::unique_lock itsLock(queueMutex_); while(!queue_.empty()) { auto queueEntry = queue_.front(); queue_.pop(); queueEntry->clear(); } } void DBusQueueWatch::dispatch(unsigned int) { } const pollfd& DBusQueueWatch::getAssociatedFileDescriptor() { return pollFileDescriptor_; } #ifdef _WIN32 const HANDLE& DBusQueueWatch::getAssociatedEvent() { return wsaEvent_; } #endif const std::vector& DBusQueueWatch::getDependentDispatchSources() { std::lock_guard itsLock(dependentDispatchSourcesMutex_); return dependentDispatchSources_; } void DBusQueueWatch::addDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { std::lock_guard itsLock(dependentDispatchSourcesMutex_); dependentDispatchSources_.push_back(_dispatchSource); } void DBusQueueWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _dispatchSource) { std::lock_guard itsLock(dependentDispatchSourcesMutex_); std::vector::iterator it; for (it = dependentDispatchSources_.begin(); it != dependentDispatchSources_.end(); it++) { if ( (*it) == _dispatchSource ) { dependentDispatchSources_.erase(it); break; } } } void DBusQueueWatch::pushQueue(std::shared_ptr _queueEntry) { { std::unique_lock itsLock(queueMutex_); queue_.push(_queueEntry); } #ifdef _WIN32 // Send an initial buffer char *sendbuf = "1"; int iResult = send(pipeFileDescriptors_[1], sendbuf, (int)strlen(sendbuf), 0); if (iResult == SOCKET_ERROR) { int error = WSAGetLastError(); if (error != WSANOTINITIALISED) { printf("send failed with error: %d\n", error); } } #else while (write(eventFd_, &eventFdValue_, sizeof(eventFdValue_)) == -1) { if (errno != EAGAIN && errno != EINTR) { std::perror(__func__); break; } std::this_thread::yield(); } #endif } void DBusQueueWatch::popQueue() { #ifdef _WIN32 // Receive until the peer closes the connection int iResult; char recvbuf[1]; int recvbuflen = 1; iResult = recv(pipeFileDescriptors_[0], recvbuf, recvbuflen, 0); if (iResult > 0) { //printf("Bytes received from %d: %d\n", wakeFd_.fd, iResult); } else if (iResult == 0) { printf("Connection closed\n"); } else { printf("recv failed with error: %d\n", WSAGetLastError()); } #else std::uint64_t readValue(0); while (read(eventFd_, &readValue, sizeof(readValue)) == -1) { if (errno != EAGAIN && errno != EINTR) { std::perror(__func__); break; } std::this_thread::yield(); } #endif { std::unique_lock itsLock(queueMutex_); queue_.pop(); } } std::shared_ptr DBusQueueWatch::frontQueue() { std::unique_lock itsLock(queueMutex_); if(queue_.empty()) { return NULL; } else { return queue_.front(); } } bool DBusQueueWatch::emptyQueue() { std::unique_lock itsLock(queueMutex_); return queue_.empty(); } void DBusQueueWatch::processQueueEntry(std::shared_ptr _queueEntry) { std::shared_ptr itsConnection = connection_.lock(); if(itsConnection) { _queueEntry->process(itsConnection); } } } // namespace DBus } // namespace CommonAPI