From ef19d923c46c43e937f1d4dd91c906bda08d64bd Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 10 Dec 2007 17:57:49 +0000 Subject: src/tests/SocketProxy.h: proxy between local client & server to simulate network disconnect. src/qpid/client/Connector.h: remove friend hack for previous flawed disconnect approach. src/tests/BrokerFixture.h: "" src/tests/ClientSessionTest.cpp, exception_test.cpp: use ProxyConnection git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@602980 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 80 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 qpid/cpp/src/tests/SocketProxy.h (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h new file mode 100644 index 0000000000..03d9b6ad35 --- /dev/null +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -0,0 +1,80 @@ +#ifndef SOCKETPROXY_H +#define SOCKETPROXY_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Socket.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" + +/** + * A simple socket proxy that forwards to another socket. Used between + * client & broker to simulate network failures. + */ +struct SocketProxy : public qpid::sys::Runnable +{ + int port; // Port bound to server socket. + qpid::sys::Socket client, server; // Client & server sockets. + + SocketProxy(const std::string& host, int port) { init(host,port); } + SocketProxy(int port) { init("localhost",port); } + + ~SocketProxy() { client.close(); server.close(); thread.join(); } + + private: + + void init(const std::string& host, int port) { + client.connect(host,port); + port = server.listen(); + thread=qpid::sys::Thread(this); + } + + void run() { + do { + ssize_t recv = server.recv(buffer, sizeof(buffer)); + if (recv <= 0) return; + ssize_t sent=client.send(buffer, recv); + if (sent < 0) return; + assert(sent == recv); // Assumes we can send as we receive. + } while (true); + } + + qpid::sys::Thread thread; + char buffer[64*1024]; +}; + +/** A local client connection via a socket proxy. */ +struct ProxyConnection : public qpid::client::Connection { + SocketProxy proxy; + qpid::client::Session_0_10 session; + + ProxyConnection(const std::string& host, int port) : proxy(port) { + open(host, proxy.port); + session=newSession(); + } + + ProxyConnection(int port) : proxy(port) { + open("localhost", proxy.port); + session=newSession(); + } +}; + +#endif -- cgit v1.2.1 From a1ac9e3af90d2f3ec06fc1bf510d3e1d963ca4aa Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 11 Dec 2007 15:29:54 +0000 Subject: src/tests/ClientSessionTest.cpp: Disabled hanging test: testDisconnectResume. src/tests/SocketProxy.h: fixed exception handling. src/tests/exception_test.cpp: fixed compile error. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@603273 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 03d9b6ad35..b985ded175 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -41,20 +41,22 @@ struct SocketProxy : public qpid::sys::Runnable private: - void init(const std::string& host, int port) { - client.connect(host,port); + void init(const std::string& host, int connectPort) { + client.connect(host, connectPort); port = server.listen(); thread=qpid::sys::Thread(this); } void run() { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); + try { + do { + ssize_t recv = server.recv(buffer, sizeof(buffer)); + if (recv <= 0) return; + ssize_t sent=client.send(buffer, recv); + if (sent < 0) return; + assert(sent == recv); // Assumes we can send as we receive. + } while (true); + } catch(...) {} } qpid::sys::Thread thread; -- cgit v1.2.1 From 21929425a5e5f62b74c67b20641f5e029551770f Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 24 Jan 2008 22:26:12 +0000 Subject: Improved/additional client API tests. - Replaced InProcessBroker with a more accurate loopback BrokerFixture. - Added asserts for mutex/condition/thread errors in debug build. - Added client tests for several exception conditions. - Added peer address to log ouput, client/server distinguished by (addr) or [addr] - Fixed various deadlocks & races exposed by the new asserts & tests. File-by-file: New BrokerFixture replaces InProcessBroker D src/tests/InProcessBroker.h M src/tests/BrokerFixture.h M src/tests/SocketProxy.h M src/tests/Makefile.am Made it run a bit faster. M src/tests/quick_perftest Redundant D src/tests/APRBaseTest.cpp Updated tests to use BrokerFixture M src/tests/ClientChannelTest.cpp M src/tests/exception_test.cpp M src/tests/ClientSessionTest.cpp Print thread IDs in decimal, same as GDB. M src/qpid/log/Logger.cpp Assert mutex/condition ops in debug build. M src/qpid/sys/posix/check.h M src/qpid/sys/posix/Mutex.h M src/qpid/sys/posix/Condition.h M src/qpid/sys/posix/Thread.h Added toFd() so SocketProxy can use ::select() M src/qpid/sys/Socket.h M src/qpid/sys/posix/Socket.cpp Fixes for races & deadlocks shown up by new tests & asserts. Mostly shutdown/close issues. M src/qpid/client/ConnectionHandler.h M src/qpid/client/ConnectionImpl.cpp M src/qpid/client/Demux.h M src/qpid/client/SessionCore.cpp M src/qpid/client/ConnectionHandler.cpp M src/qpid/client/Connector.h M src/qpid/client/Demux.cpp M src/qpid/client/Dispatcher.cpp M src/qpid/client/ConnectionImpl.h Logging peer address. M src/qpid/sys/AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@615063 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 158 +++++++++++++++++++++++++++++---------- 1 file changed, 120 insertions(+), 38 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index b985ded175..a37c1f2c3e 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -24,59 +24,141 @@ #include "qpid/sys/Socket.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/client/Connection.h" +#include "qpid/log/Statement.h" + +#include /** - * A simple socket proxy that forwards to another socket. Used between - * client & broker to simulate network failures. + * A simple socket proxy that forwards to another socket. + * Used between client & local broker to simulate network failures. */ -struct SocketProxy : public qpid::sys::Runnable +class SocketProxy : private qpid::sys::Runnable { - int port; // Port bound to server socket. - qpid::sys::Socket client, server; // Client & server sockets. + public: + /** Connect to connectPort on host, start a forwarding thread. + * Listen for connection on getPort(). + */ + SocketProxy(int connectPort, const std::string host="localhost") + : closed(false), port(listener.listen()) + { + int r=::pipe(closePipe); + if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); + client.connect(host, connectPort); + thread = qpid::sys::Thread(static_cast(this)); + } + + ~SocketProxy() { close(); } - SocketProxy(const std::string& host, int port) { init(host,port); } - SocketProxy(int port) { init("localhost",port); } + /** Simulate a network disconnect. */ + void close() { + { + qpid::sys::Mutex::ScopedLock l(lock); + if (closed) return; + closed=true; + } + write(closePipe[1], this, 1); // Random byte to closePipe + thread.join(); + client.close(); + ::close(closePipe[0]); + ::close(closePipe[1]); + } - ~SocketProxy() { client.close(); server.close(); thread.join(); } + bool isClosed() const { + qpid::sys::Mutex::ScopedLock l(lock); + return closed; + } + + uint16_t getPort() const { return port; } private: - - void init(const std::string& host, int connectPort) { - client.connect(host, connectPort); - port = server.listen(); - thread=qpid::sys::Thread(this); + static void throwErrno(const std::string& msg) { + throw qpid::Exception(msg+":"+qpid::strError(errno)); } - - void run() { - try { - do { - ssize_t recv = server.recv(buffer, sizeof(buffer)); - if (recv <= 0) return; - ssize_t sent=client.send(buffer, recv); - if (sent < 0) return; - assert(sent == recv); // Assumes we can send as we receive. - } while (true); - } catch(...) {} + static void throwIf(bool condition, const std::string& msg) { + if (condition) throw qpid::Exception(msg); } + + struct FdSet : fd_set { + FdSet() : maxFd(0) { clear(); } + void clear() { FD_ZERO(this); } + void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } + bool isSet(int fd) const { return FD_ISSET(fd, this); } + bool operator[](int fd) const { return isSet(fd); } - qpid::sys::Thread thread; - char buffer[64*1024]; -}; + int maxFd; + }; -/** A local client connection via a socket proxy. */ -struct ProxyConnection : public qpid::client::Connection { - SocketProxy proxy; - qpid::client::Session_0_10 session; + enum { RD=1, WR=2, ER=4 }; - ProxyConnection(const std::string& host, int port) : proxy(port) { - open(host, proxy.port); - session=newSession(); - } + struct Selector { + FdSet rd, wr, er; + + void set(int fd, int sets) { + if (sets & RD) rd.set(fd); + if (sets & WR) wr.set(fd); + if (sets & ER) er.set(fd); + } + + int select() { + for (;;) { + int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); + int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); + if (r == -1 && errno == EINTR) continue; + if (r < 0) throwErrno(QPID_MSG("select returned " < server; + try { + // Accept incoming connections, watch closePipe. + Selector accept; + accept.set(listener.toFd(), RD|ER); + accept.set(closePipe[0], RD|ER); + accept.select(); + throwIf(accept.rd[closePipe[0]], "Closed by close()"); + throwIf(!accept.rd[listener.toFd()],"Accept failed"); + server.reset(listener.accept(0, 0)); + + // Pump data between client & server sockets, watch closePipe. + char buffer[1024]; + for (;;) { + Selector select; + select.set(server->toFd(), RD|ER); + select.set(client.toFd(), RD|ER); + select.set(closePipe[0], RD|ER); + select.select(); + throwIf(select.rd[closePipe[0]], "Closed by close()"); + // Read even if fd is in error to throw a useful exception. + bool gotData=false; + if (select.rd[server->toFd()] || select.er[server->toFd()]) { + client.write(buffer, server->read(buffer, sizeof(buffer))); + gotData=true; + } + if (select.rd[client.toFd()] || select.er[client.toFd()]) { + server->write(buffer, client.read(buffer, sizeof(buffer))); + gotData=true; + } + throwIf(!gotData, "No data from select()"); + } + } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + } + if (server.get()) server->close(); + close(); } + + mutable qpid::sys::Mutex lock; + bool closed; + qpid::sys::Socket client, listener; + uint16_t port; + int closePipe[2]; + qpid::sys::Thread thread; }; #endif -- cgit v1.2.1 From 7bf3ed2b4bca4706e3837126d597ae5d2ee11537 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 15 Apr 2008 15:41:21 +0000 Subject: Refactored the IO framework that sits on top of Poller so that it uses a generalised IOHandle. This means that you can define new classes derived from IOHandle (other than Socket) that can also be added to a Poller and waited for. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648288 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 84 ++++++++++++---------------------------- 1 file changed, 24 insertions(+), 60 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index a37c1f2c3e..3263652fe2 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -22,6 +22,7 @@ */ #include "qpid/sys/Socket.h" +#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" @@ -43,8 +44,6 @@ class SocketProxy : private qpid::sys::Runnable SocketProxy(int connectPort, const std::string host="localhost") : closed(false), port(listener.listen()) { - int r=::pipe(closePipe); - if (r<0) throwErrno(QPID_MSG("::pipe returned " << r)); client.connect(host, connectPort); thread = qpid::sys::Thread(static_cast(this)); } @@ -58,11 +57,9 @@ class SocketProxy : private qpid::sys::Runnable if (closed) return; closed=true; } - write(closePipe[1], this, 1); // Random byte to closePipe + poller.shutdown(); thread.join(); client.close(); - ::close(closePipe[0]); - ::close(closePipe[1]); } bool isClosed() const { @@ -79,71 +76,38 @@ class SocketProxy : private qpid::sys::Runnable static void throwIf(bool condition, const std::string& msg) { if (condition) throw qpid::Exception(msg); } - - struct FdSet : fd_set { - FdSet() : maxFd(0) { clear(); } - void clear() { FD_ZERO(this); } - void set(int fd) { FD_SET(fd, this); maxFd = std::max(maxFd, fd); } - bool isSet(int fd) const { return FD_ISSET(fd, this); } - bool operator[](int fd) const { return isSet(fd); } - - int maxFd; - }; - - enum { RD=1, WR=2, ER=4 }; - - struct Selector { - FdSet rd, wr, er; - - void set(int fd, int sets) { - if (sets & RD) rd.set(fd); - if (sets & WR) wr.set(fd); - if (sets & ER) er.set(fd); - } - - int select() { - for (;;) { - int maxFd = std::max(rd.maxFd, std::max(wr.maxFd, er.maxFd)); - int r = ::select(maxFd + 1, &rd, &wr, &er, NULL); - if (r == -1 && errno == EINTR) continue; - if (r < 0) throwErrno(QPID_MSG("select returned " < server; try { - // Accept incoming connections, watch closePipe. - Selector accept; - accept.set(listener.toFd(), RD|ER); - accept.set(closePipe[0], RD|ER); - accept.select(); - throwIf(accept.rd[closePipe[0]], "Closed by close()"); - throwIf(!accept.rd[listener.toFd()],"Accept failed"); + qpid::sys::PollerHandle listenerHandle(listener); + poller.addFd(listenerHandle, qpid::sys::Poller::IN); + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed"); + + poller.delFd(listenerHandle); server.reset(listener.accept(0, 0)); - // Pump data between client & server sockets, watch closePipe. + // Pump data between client & server sockets + qpid::sys::PollerHandle clientHandle(client); + qpid::sys::PollerHandle serverHandle(*server); + poller.addFd(clientHandle, qpid::sys::Poller::IN); + poller.addFd(serverHandle, qpid::sys::Poller::IN); char buffer[1024]; for (;;) { - Selector select; - select.set(server->toFd(), RD|ER); - select.set(client.toFd(), RD|ER); - select.set(closePipe[0], RD|ER); - select.select(); - throwIf(select.rd[closePipe[0]], "Closed by close()"); - // Read even if fd is in error to throw a useful exception. - bool gotData=false; - if (select.rd[server->toFd()] || select.er[server->toFd()]) { + qpid::sys::Poller::Event event = poller.wait(); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); + throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected"); + if (event.handle == &serverHandle) { client.write(buffer, server->read(buffer, sizeof(buffer))); - gotData=true; - } - if (select.rd[client.toFd()] || select.er[client.toFd()]) { + poller.rearmFd(serverHandle); + } else if (event.handle == &clientHandle) { server->write(buffer, client.read(buffer, sizeof(buffer))); - gotData=true; + poller.rearmFd(clientHandle); + } else { + throwIf(true, "No handle ready"); } - throwIf(!gotData, "No data from select()"); } } catch (const std::exception& e) { @@ -155,9 +119,9 @@ class SocketProxy : private qpid::sys::Runnable mutable qpid::sys::Mutex lock; bool closed; + qpid::sys::Poller poller; qpid::sys::Socket client, listener; uint16_t port; - int closePipe[2]; qpid::sys::Thread thread; }; -- cgit v1.2.1 From 3eedb6bffa26b6beed6776277f70479d91a31ca0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 9 May 2008 18:46:17 +0000 Subject: Support for 0-10 sessions, not yet integrated. Misc minor fixes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@654913 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 3263652fe2..b53387bd57 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -42,7 +42,7 @@ class SocketProxy : private qpid::sys::Runnable * Listen for connection on getPort(). */ SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), port(listener.listen()) + : closed(false), port(listener.listen()), dropClient(), dropServer() { client.connect(host, connectPort); thread = qpid::sys::Thread(static_cast(this)); @@ -58,10 +58,17 @@ class SocketProxy : private qpid::sys::Runnable closed=true; } poller.shutdown(); + if (thread.id() != qpid::sys::Thread::current().id()) thread.join(); client.close(); } + /** Simulate lost packets, drop data from client */ + void dropClientData(bool drop=true) { dropClient=drop; } + + /** Simulate lost packets, drop data from server */ + void dropServerData(bool drop=true) { dropServer=drop; } + bool isClosed() const { qpid::sys::Mutex::ScopedLock l(lock); return closed; @@ -83,8 +90,8 @@ class SocketProxy : private qpid::sys::Runnable qpid::sys::PollerHandle listenerHandle(listener); poller.addFd(listenerHandle, qpid::sys::Poller::IN); qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); - throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "Accept failed"); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); + throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); poller.delFd(listenerHandle); server.reset(listener.accept(0, 0)); @@ -97,25 +104,32 @@ class SocketProxy : private qpid::sys::Runnable char buffer[1024]; for (;;) { qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "Closed by close()"); - throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "client/server disconnected"); + throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); + throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected"); if (event.handle == &serverHandle) { - client.write(buffer, server->read(buffer, sizeof(buffer))); + ssize_t n = server->read(buffer, sizeof(buffer)); + if (!dropServer) client.write(buffer, n); poller.rearmFd(serverHandle); } else if (event.handle == &clientHandle) { - server->write(buffer, client.read(buffer, sizeof(buffer))); + ssize_t n = client.read(buffer, sizeof(buffer)); + if (!dropClient) server->write(buffer, n); poller.rearmFd(clientHandle); } else { - throwIf(true, "No handle ready"); + throwIf(true, "SocketProxy: No handle ready"); } } } catch (const std::exception& e) { - QPID_LOG(debug, "SocketProxy::run exiting: " << e.what()); + QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); } + try { if (server.get()) server->close(); close(); } + catch (const std::exception& e) { + QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); + } + } mutable qpid::sys::Mutex lock; bool closed; @@ -123,6 +137,7 @@ class SocketProxy : private qpid::sys::Runnable qpid::sys::Socket client, listener; uint16_t port; qpid::sys::Thread thread; + bool dropClient, dropServer; }; #endif -- cgit v1.2.1 From fba71cbd9623141c7522718b9477dae34726dd89 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 30 Jul 2008 06:29:16 +0000 Subject: Related to QPID-1198: Moved posix platform specific "strerror" code to platform specific directory git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@680920 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index b53387bd57..a1a1351c7d 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -78,7 +78,7 @@ class SocketProxy : private qpid::sys::Runnable private: static void throwErrno(const std::string& msg) { - throw qpid::Exception(msg+":"+qpid::strError(errno)); + throw qpid::Exception(msg+":"+qpid::sys::strError(errno)); } static void throwIf(bool condition, const std::string& msg) { if (condition) throw qpid::Exception(msg); -- cgit v1.2.1 From 65db1dc07a33aa419b842a34e61fa5781841b0bf Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Tue, 21 Oct 2008 18:29:44 +0000 Subject: Refactor sys::AsynchIO class to allow reimplementing on other platforms without affecting upper level usage. Resolves QPID-1377 and supplies Windows AsynchIO.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@706709 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index a1a1351c7d..9722359d82 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -88,7 +88,7 @@ class SocketProxy : private qpid::sys::Runnable std::auto_ptr server; try { qpid::sys::PollerHandle listenerHandle(listener); - poller.addFd(listenerHandle, qpid::sys::Poller::IN); + poller.addFd(listenerHandle, qpid::sys::Poller::INPUT); qpid::sys::Poller::Event event = poller.wait(); throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); @@ -99,8 +99,8 @@ class SocketProxy : private qpid::sys::Runnable // Pump data between client & server sockets qpid::sys::PollerHandle clientHandle(client); qpid::sys::PollerHandle serverHandle(*server); - poller.addFd(clientHandle, qpid::sys::Poller::IN); - poller.addFd(serverHandle, qpid::sys::Poller::IN); + poller.addFd(clientHandle, qpid::sys::Poller::INPUT); + poller.addFd(serverHandle, qpid::sys::Poller::INPUT); char buffer[1024]; for (;;) { qpid::sys::Poller::Event event = poller.wait(); -- cgit v1.2.1 From 273ba63c838834b46cf4c23c1ead8ccc27d8f19b Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 4 Mar 2009 09:26:28 +0000 Subject: QPID-1710: update test inline with modified method signature. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@749951 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 9722359d82..d2a93c902b 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -94,7 +94,7 @@ class SocketProxy : private qpid::sys::Runnable throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); poller.delFd(listenerHandle); - server.reset(listener.accept(0, 0)); + server.reset(listener.accept()); // Pump data between client & server sockets qpid::sys::PollerHandle clientHandle(client); -- cgit v1.2.1 From fe1ae254cf30d8209568d31cb2c0eb0c76646650 Mon Sep 17 00:00:00 2001 From: "Stephen D. Huston" Date: Thu, 26 Mar 2009 21:04:47 +0000 Subject: Enable SocketProxy portability to Windows; fixes QPID-1765 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@758852 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 111 ++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 41 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index d2a93c902b..7697bf635f 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -21,45 +21,58 @@ * */ +#include "qpid/sys/IOHandle.h" +#ifdef _WIN32 +# include "qpid/sys/windows/IoHandlePrivate.h" + typedef SOCKET FdType; +#else +# include "qpid/sys/posix/PrivatePosix.h" + typedef int FdType; +#endif #include "qpid/sys/Socket.h" -#include "qpid/sys/Poller.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" -#include "qpid/client/Connection.h" #include "qpid/log/Statement.h" -#include - /** * A simple socket proxy that forwards to another socket. * Used between client & local broker to simulate network failures. */ class SocketProxy : private qpid::sys::Runnable { + // Need a Socket we can get the fd from + class LowSocket : public qpid::sys::Socket { + public: + FdType getFd() { return toFd(impl); } + }; + public: /** Connect to connectPort on host, start a forwarding thread. * Listen for connection on getPort(). */ SocketProxy(int connectPort, const std::string host="localhost") - : closed(false), port(listener.listen()), dropClient(), dropServer() + : closed(false), joined(true), + port(listener.listen()), dropClient(), dropServer() { client.connect(host, connectPort); + joined = false; thread = qpid::sys::Thread(static_cast(this)); } - ~SocketProxy() { close(); } + ~SocketProxy() { close(); if (!joined) thread.join(); } /** Simulate a network disconnect. */ void close() { { qpid::sys::Mutex::ScopedLock l(lock); - if (closed) return; + if (closed) { return; } closed=true; } - poller.shutdown(); - if (thread.id() != qpid::sys::Thread::current().id()) - thread.join(); + if (thread.id() != qpid::sys::Thread::current().id()) { + thread.join(); + joined = true; + } client.close(); } @@ -85,56 +98,72 @@ class SocketProxy : private qpid::sys::Runnable } void run() { - std::auto_ptr server; + std::auto_ptr server; try { - qpid::sys::PollerHandle listenerHandle(listener); - poller.addFd(listenerHandle, qpid::sys::Poller::INPUT); - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed"); - - poller.delFd(listenerHandle); - server.reset(listener.accept()); - - // Pump data between client & server sockets - qpid::sys::PollerHandle clientHandle(client); - qpid::sys::PollerHandle serverHandle(*server); - poller.addFd(clientHandle, qpid::sys::Poller::INPUT); - poller.addFd(serverHandle, qpid::sys::Poller::INPUT); + fd_set socks; + FD_ZERO(&socks); + FdType maxFd = listener.getFd(); + FD_SET(maxFd, &socks); + struct timeval tmo; + for (;;) { + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed"); + break; // Accept ready... go to next step + } + server.reset(reinterpret_cast(listener.accept())); + maxFd = server->getFd(); + if (client.getFd() > maxFd) + maxFd = client.getFd(); char buffer[1024]; for (;;) { - qpid::sys::Poller::Event event = poller.wait(); - throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()"); - throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected"); - if (event.handle == &serverHandle) { + FD_ZERO(&socks); + tmo.tv_sec = 0; + tmo.tv_usec = 500 * 1000; + FD_SET(client.getFd(), &socks); + FD_SET(server->getFd(), &socks); + if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { + qpid::sys::Mutex::ScopedLock l(lock); + throwIf(closed, "SocketProxy: Closed by close()"); + continue; + } + // Something is set; relay data as needed until something closes + if (FD_ISSET(server->getFd(), &socks)) { ssize_t n = server->read(buffer, sizeof(buffer)); + throwIf(n <= 0, "SocketProxy: server disconnected"); if (!dropServer) client.write(buffer, n); - poller.rearmFd(serverHandle); - } else if (event.handle == &clientHandle) { + } + if (FD_ISSET(client.getFd(), &socks)) { ssize_t n = client.read(buffer, sizeof(buffer)); - if (!dropClient) server->write(buffer, n); - poller.rearmFd(clientHandle); - } else { - throwIf(true, "SocketProxy: No handle ready"); + throwIf(n <= 0, "SocketProxy: client disconnected"); + if (!dropServer) server->write(buffer, n); } + if (!FD_ISSET(client.getFd(), &socks) && + !FD_ISSET(server->getFd(), &socks)) + throwIf(true, "SocketProxy: No handle ready"); } } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception: " << e.what()); } try { - if (server.get()) server->close(); - close(); - } + if (server.get()) server->close(); + close(); + } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); } } mutable qpid::sys::Mutex lock; - bool closed; - qpid::sys::Poller poller; - qpid::sys::Socket client, listener; + mutable bool closed; + bool joined; + LowSocket client, listener; uint16_t port; qpid::sys::Thread thread; bool dropClient, dropServer; -- cgit v1.2.1 From 803ecb638b36a6018b3c3ca9ab50642fb7cb812b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 15 Apr 2009 20:18:45 +0000 Subject: Fix bug in SocketProxy causing occasional hangs in tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@765338 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 7697bf635f..ccce3c8842 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -101,11 +101,11 @@ class SocketProxy : private qpid::sys::Runnable std::auto_ptr server; try { fd_set socks; - FD_ZERO(&socks); FdType maxFd = listener.getFd(); - FD_SET(maxFd, &socks); struct timeval tmo; for (;;) { + FD_ZERO(&socks); + FD_SET(maxFd, &socks); tmo.tv_sec = 0; tmo.tv_usec = 500 * 1000; if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) { -- cgit v1.2.1 From 9259c46ecb8c5f3e98441080a26914bdea59bffe Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 9 Sep 2009 19:46:56 +0000 Subject: Tidied up namespace usage Miscelleneous whitespace fixes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@813094 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index ccce3c8842..9df32a1336 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -35,8 +35,11 @@ #include "qpid/sys/Mutex.h" #include "qpid/log/Statement.h" +namespace qpid { +namespace tests { + /** - * A simple socket proxy that forwards to another socket. + * A simple socket proxy that forwards to another socket. * Used between client & local broker to simulate network failures. */ class SocketProxy : private qpid::sys::Runnable @@ -59,7 +62,7 @@ class SocketProxy : private qpid::sys::Runnable joined = false; thread = qpid::sys::Thread(static_cast(this)); } - + ~SocketProxy() { close(); if (!joined) thread.join(); } /** Simulate a network disconnect. */ @@ -88,7 +91,7 @@ class SocketProxy : private qpid::sys::Runnable } uint16_t getPort() const { return port; } - + private: static void throwErrno(const std::string& msg) { throw qpid::Exception(msg+":"+qpid::sys::strError(errno)); @@ -153,7 +156,7 @@ class SocketProxy : private qpid::sys::Runnable } try { if (server.get()) server->close(); - close(); + close(); } catch (const std::exception& e) { QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what()); @@ -169,4 +172,6 @@ class SocketProxy : private qpid::sys::Runnable bool dropClient, dropServer; }; +}} // namespace qpid::tests + #endif -- cgit v1.2.1 From bc24dd44e6eb170a428d419087a4d528cb11558c Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 21 Oct 2009 21:53:53 +0000 Subject: Tidied up dependencies in IOHandle so that it is no longer dependent on the windows implementation classes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@828230 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 9df32a1336..df243cb42a 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -47,7 +47,11 @@ class SocketProxy : private qpid::sys::Runnable // Need a Socket we can get the fd from class LowSocket : public qpid::sys::Socket { public: +#ifdef _WIN32 + FdType getFd() { return toSocketHandle(*this); } +#else FdType getFd() { return toFd(impl); } +#endif }; public: -- cgit v1.2.1 From 6ef9706fc3d447768b4d0d2b0cee8bea828677bd Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 15 Dec 2009 18:24:02 +0000 Subject: QPID-1951: Removed need for Windows versions of ssize_t and pid_t - Trivially removed Windows uses of ssize_t - Rearchitected how the Windows port finds an existing qpidd to stop it - Split Posix Lockfile functionality using pids into a new PidFile class git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@890929 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index df243cb42a..4582dc36fd 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -141,12 +141,12 @@ class SocketProxy : private qpid::sys::Runnable } // Something is set; relay data as needed until something closes if (FD_ISSET(server->getFd(), &socks)) { - ssize_t n = server->read(buffer, sizeof(buffer)); + int n = server->read(buffer, sizeof(buffer)); throwIf(n <= 0, "SocketProxy: server disconnected"); if (!dropServer) client.write(buffer, n); } if (FD_ISSET(client.getFd(), &socks)) { - ssize_t n = client.read(buffer, sizeof(buffer)); + int n = client.read(buffer, sizeof(buffer)); throwIf(n <= 0, "SocketProxy: client disconnected"); if (!dropServer) server->write(buffer, n); } -- cgit v1.2.1 From e20869eb4e1ed97f00c3f3e9843b14a00267d25b Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 21 Apr 2010 22:07:04 +0000 Subject: QPID-2527: Remove Thread::id member as its uses are better implemented by comparison operators. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@936537 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 4582dc36fd..0c6f39d62e 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -76,7 +76,7 @@ class SocketProxy : private qpid::sys::Runnable if (closed) { return; } closed=true; } - if (thread.id() != qpid::sys::Thread::current().id()) { + if (thread && thread != qpid::sys::Thread::current()) { thread.join(); joined = true; } -- cgit v1.2.1 From 46bec4ce2ac0e8260eee6c5e2986bae0f6dafbec Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 26 May 2011 20:38:16 +0000 Subject: Refactor socket connect calls to take a string port This is used used to implement unix domain sockets - QPID-3281 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1128064 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/SocketProxy.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src/tests/SocketProxy.h') diff --git a/qpid/cpp/src/tests/SocketProxy.h b/qpid/cpp/src/tests/SocketProxy.h index 0c6f39d62e..d195f11aa9 100644 --- a/qpid/cpp/src/tests/SocketProxy.h +++ b/qpid/cpp/src/tests/SocketProxy.h @@ -35,6 +35,8 @@ #include "qpid/sys/Mutex.h" #include "qpid/log/Statement.h" +#include + namespace qpid { namespace tests { @@ -62,7 +64,7 @@ class SocketProxy : private qpid::sys::Runnable : closed(false), joined(true), port(listener.listen()), dropClient(), dropServer() { - client.connect(host, connectPort); + client.connect(host, boost::lexical_cast(connectPort)); joined = false; thread = qpid::sys::Thread(static_cast(this)); } -- cgit v1.2.1