diff options
author | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-12-01 05:11:45 +0000 |
commit | fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch) | |
tree | a2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/client/Connection.cpp | |
parent | 33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff) | |
download | qpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz |
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy:
- adds autoconf, automake, libtool support
- makes the hierarchy flatter and renames a few files (e.g., Queue.h,
Queue.cpp) that appeared twice, once under client/ and again under broker/.
In the process, I've changed many #include directives, mostly
to remove a qpid/ or qpid/framing/ prefix from the file name argument.
Although most changes were to .cpp and .h files under qpid/cpp/, there
were also several to template files under qpid/gentools, and even one
to CppGenerator.java.
Nearly all files are moved to a new position in the hierarchy.
The new hierarchy looks like this:
src # this is the new home of qpidd.cpp
tests # all tests are here. See Makefile.am.
gen # As before, all generated files go here.
lib # This is just a container for the 3 lib dirs:
lib/client
lib/broker
lib/common
lib/common/framing
lib/common/sys
lib/common/sys/posix
lib/common/sys/apr
build-aux
m4
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/Connection.cpp')
-rw-r--r-- | cpp/lib/client/Connection.cpp | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp new file mode 100644 index 0000000000..9c81192573 --- /dev/null +++ b/cpp/lib/client/Connection.cpp @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <Connection.h> +#include <ClientChannel.h> +#include <ClientMessage.h> +#include <QpidError.h> +#include <iostream> +#include <MethodBodyInstances.h> + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; +using namespace qpid::sys; + +u_int16_t Connection::channelIdCounter; + +Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), +// AMQP version management change - kpvdr 2006-11-20 +// TODO: Make this class version-aware and link these hard-wired numbers to that version + version(8, 0) +{ + connector = new Connector(debug, _max_frame_size); +} + +Connection::~Connection(){ + delete connector; +} + +void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ + host = _host; + port = _port; + connector->setInputHandler(this); + connector->setTimeoutHandler(this); + connector->setShutdownHandler(this); + out = connector->getOutputHandler(); + connector->connect(host, port); + + ProtocolInitiation* header = new ProtocolInitiation(8, 0); + responses.expect(); + connector->init(header); + responses.receive(method_bodies.connection_start); + + FieldTable props; + string mechanism("PLAIN"); + string response = ((char)0) + uid + ((char)0) + pwd; + string locale("en_US"); + responses.expect(); + out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); + + /** + * Assume for now that further challenges will not be required + //receive connection.secure + responses.receive(connection_secure)); + //send connection.secure-ok + out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); + **/ + + responses.receive(method_bodies.connection_tune); + + ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse()); + out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); + + u_int16_t heartbeat = proposal->getHeartbeat(); + connector->setReadTimeout(heartbeat * 2); + connector->setWriteTimeout(heartbeat); + + //send connection.open + string capabilities; + string vhost = virtualhost; + responses.expect(); + out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); + //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). + responses.waitForResponse(); + if(responses.validate(method_bodies.connection_open_ok)){ + //ok + }else if(responses.validate(method_bodies.connection_redirect)){ + //ignore for now + ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse())); + std::cout << "Received redirection to " << redirect->getHost() << std::endl; + }else{ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); + } + +} + +void Connection::close(){ + if(!closed){ + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); + connector->close(); + } +} + +void Connection::openChannel(Channel* channel){ + channel->con = this; + channel->id = ++channelIdCounter; + channel->out = out; + channels[channel->id] = channel; + //now send frame to open channel and wait for response + string oob; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); + channel->setQos(); + channel->closed = false; +} + +void Connection::closeChannel(Channel* channel){ + //send frame to close channel + u_int16_t code(200); + string text("Ok"); + u_int16_t classId(0); + u_int16_t methodId(0); + closeChannel(channel, code, text, classId, methodId); +} + +void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ + //send frame to close channel + channel->cancelAll(); + channel->closed = true; + channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); + channel->con = 0; + channel->out = 0; + removeChannel(channel); +} + +void Connection::removeChannel(Channel* channel){ + //send frame to close channel + + channels.erase(channel->id); + channel->out = 0; + channel->id = 0; + channel->con = 0; +} + +void Connection::received(AMQFrame* frame){ + u_int16_t channelId = frame->getChannel(); + + if(channelId == 0){ + this->handleBody(frame->getBody()); + }else{ + Channel* channel = channels[channelId]; + if(channel == 0){ + error(504, "Unknown channel"); + }else{ + try{ + channel->handleBody(frame->getBody()); + }catch(qpid::QpidError e){ + channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + } + } + } +} + +void Connection::handleMethod(AMQMethodBody::shared_ptr body){ + //connection.close, basic.deliver, basic.return or a response to a synchronous request + if(responses.isWaiting()){ + responses.signalResponse(body); + }else if(method_bodies.connection_close.match(body.get())){ + //send back close ok + //close socket + ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get()); + std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; + connector->close(); + }else{ + std::cout << "Unhandled method for connection: " << *body << std::endl; + error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); + } +} + +void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ + error(504, "Channel error: received header body with channel 0."); +} + +void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ + error(504, "Channel error: received content body with channel 0."); +} + +void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ +} + +void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ + responses.expect(); + out->send(frame); + responses.receive(body); +} + +void Connection::error(int code, const string& msg, int classid, int methodid){ + std::cout << "Connection exception generated: " << code << msg; + if(classid || methodid){ + std::cout << " [" << methodid << ":" << classid << "]"; + } + std::cout << std::endl; + sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); + connector->close(); +} + +void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ + std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; + int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; + string msg = e.msg; + if(method == 0){ + closeChannel(channel, code, msg); + }else{ + closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); + } +} + +void Connection::idleIn(){ + std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; + connector->close(); +} + +void Connection::idleOut(){ + out->send(new AMQFrame(0, new AMQHeartbeatBody())); +} + +void Connection::shutdown(){ + closed = true; + //close all channels + for(iterator i = channels.begin(); i != channels.end(); i++){ + i->second->stop(); + } +} |