summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/TCPConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/TCPConnector.cpp')
-rw-r--r--cpp/src/qpid/client/TCPConnector.cpp25
1 files changed, 15 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/TCPConnector.cpp b/cpp/src/qpid/client/TCPConnector.cpp
index a5c6465bad..783742764b 100644
--- a/cpp/src/qpid/client/TCPConnector.cpp
+++ b/cpp/src/qpid/client/TCPConnector.cpp
@@ -72,12 +72,13 @@ TCPConnector::TCPConnector(Poller::shared_ptr p,
closed(true),
shutdownHandler(0),
input(0),
+ socket(createSocket()),
connector(0),
aio(0),
poller(p)
{
QPID_LOG(debug, "TCPConnector created for " << version);
- settings.configureSocket(socket);
+ settings.configureSocket(*socket);
}
TCPConnector::~TCPConnector() {
@@ -88,7 +89,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
- socket,
+ *socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
@@ -99,7 +100,7 @@ void TCPConnector::connect(const std::string& host, const std::string& port) {
void TCPConnector::connected(const Socket&) {
connector = 0;
- aio = AsynchIO::create(socket,
+ aio = AsynchIO::create(*socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::disconnected, this, _1),
@@ -116,7 +117,7 @@ void TCPConnector::start(sys::AsynchIO* aio_) {
aio->createBuffers(maxFrameSize);
- identifier = str(format("[%1%]") % socket.getFullAddress());
+ identifier = str(format("[%1%]") % socket->getFullAddress());
}
void TCPConnector::initAmqp() {
@@ -127,7 +128,7 @@ void TCPConnector::initAmqp() {
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
- socket.close();
+ socket->close();
if (!closed)
closed = true;
if (shutdownHandler)
@@ -150,6 +151,11 @@ void TCPConnector::socketClosed(AsynchIO&, const Socket&) {
shutdownHandler->shutdown();
}
+void TCPConnector::connectAborted() {
+ connector->stop();
+ connectFailed("Connection timedout");
+}
+
void TCPConnector::abort() {
// Can't abort a closed connection
if (!closed) {
@@ -158,8 +164,7 @@ void TCPConnector::abort() {
aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
} else if (connector) {
// We're still connecting
- connector->stop();
- connectFailed("Connection timedout");
+ connector->requestCallback(boost::bind(&TCPConnector::connectAborted, this));
}
}
}
@@ -245,9 +250,9 @@ bool TCPConnector::canEncode()
}
// Called in IO thread.
-size_t TCPConnector::encode(const char* buffer, size_t size)
+size_t TCPConnector::encode(char* buffer, size_t size)
{
- framing::Buffer out(const_cast<char*>(buffer), size);
+ framing::Buffer out(buffer, size);
size_t bytesWritten(0);
{
Mutex::ScopedLock l(lock);
@@ -318,7 +323,7 @@ void TCPConnector::eof(AsynchIO&) {
void TCPConnector::disconnected(AsynchIO&) {
close();
- socketClosed(*aio, socket);
+ socketClosed(*aio, *socket);
}
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)