summaryrefslogtreecommitdiff
path: root/cpp/lib/client/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
committerAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
commitfb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch)
treea2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/client/Connection.cpp
parent33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff)
downloadqpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy: - adds autoconf, automake, libtool support - makes the hierarchy flatter and renames a few files (e.g., Queue.h, Queue.cpp) that appeared twice, once under client/ and again under broker/. In the process, I've changed many #include directives, mostly to remove a qpid/ or qpid/framing/ prefix from the file name argument. Although most changes were to .cpp and .h files under qpid/cpp/, there were also several to template files under qpid/gentools, and even one to CppGenerator.java. Nearly all files are moved to a new position in the hierarchy. The new hierarchy looks like this: src # this is the new home of qpidd.cpp tests # all tests are here. See Makefile.am. gen # As before, all generated files go here. lib # This is just a container for the 3 lib dirs: lib/client lib/broker lib/common lib/common/framing lib/common/sys lib/common/sys/posix lib/common/sys/apr build-aux m4 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/client/Connection.cpp')
-rw-r--r--cpp/lib/client/Connection.cpp244
1 files changed, 244 insertions, 0 deletions
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
new file mode 100644
index 0000000000..9c81192573
--- /dev/null
+++ b/cpp/lib/client/Connection.cpp
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <Connection.h>
+#include <ClientChannel.h>
+#include <ClientMessage.h>
+#include <QpidError.h>
+#include <iostream>
+#include <MethodBodyInstances.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::sys;
+using namespace qpid::sys;
+
+u_int16_t Connection::channelIdCounter;
+
+Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true),
+// AMQP version management change - kpvdr 2006-11-20
+// TODO: Make this class version-aware and link these hard-wired numbers to that version
+ version(8, 0)
+{
+ connector = new Connector(debug, _max_frame_size);
+}
+
+Connection::~Connection(){
+ delete connector;
+}
+
+void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
+ host = _host;
+ port = _port;
+ connector->setInputHandler(this);
+ connector->setTimeoutHandler(this);
+ connector->setShutdownHandler(this);
+ out = connector->getOutputHandler();
+ connector->connect(host, port);
+
+ ProtocolInitiation* header = new ProtocolInitiation(8, 0);
+ responses.expect();
+ connector->init(header);
+ responses.receive(method_bodies.connection_start);
+
+ FieldTable props;
+ string mechanism("PLAIN");
+ string response = ((char)0) + uid + ((char)0) + pwd;
+ string locale("en_US");
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale)));
+
+ /**
+ * Assume for now that further challenges will not be required
+ //receive connection.secure
+ responses.receive(connection_secure));
+ //send connection.secure-ok
+ out->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
+ **/
+
+ responses.receive(method_bodies.connection_tune);
+
+ ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast<ConnectionTuneBody, AMQMethodBody>(responses.getResponse());
+ out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat())));
+
+ u_int16_t heartbeat = proposal->getHeartbeat();
+ connector->setReadTimeout(heartbeat * 2);
+ connector->setWriteTimeout(heartbeat);
+
+ //send connection.open
+ string capabilities;
+ string vhost = virtualhost;
+ responses.expect();
+ out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true)));
+ //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true).
+ responses.waitForResponse();
+ if(responses.validate(method_bodies.connection_open_ok)){
+ //ok
+ }else if(responses.validate(method_bodies.connection_redirect)){
+ //ignore for now
+ ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast<ConnectionRedirectBody, AMQMethodBody>(responses.getResponse()));
+ std::cout << "Received redirection to " << redirect->getHost() << std::endl;
+ }else{
+ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response");
+ }
+
+}
+
+void Connection::close(){
+ if(!closed){
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok);
+ connector->close();
+ }
+}
+
+void Connection::openChannel(Channel* channel){
+ channel->con = this;
+ channel->id = ++channelIdCounter;
+ channel->out = out;
+ channels[channel->id] = channel;
+ //now send frame to open channel and wait for response
+ string oob;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok);
+ channel->setQos();
+ channel->closed = false;
+}
+
+void Connection::closeChannel(Channel* channel){
+ //send frame to close channel
+ u_int16_t code(200);
+ string text("Ok");
+ u_int16_t classId(0);
+ u_int16_t methodId(0);
+ closeChannel(channel, code, text, classId, methodId);
+}
+
+void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){
+ //send frame to close channel
+ channel->cancelAll();
+ channel->closed = true;
+ channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok);
+ channel->con = 0;
+ channel->out = 0;
+ removeChannel(channel);
+}
+
+void Connection::removeChannel(Channel* channel){
+ //send frame to close channel
+
+ channels.erase(channel->id);
+ channel->out = 0;
+ channel->id = 0;
+ channel->con = 0;
+}
+
+void Connection::received(AMQFrame* frame){
+ u_int16_t channelId = frame->getChannel();
+
+ if(channelId == 0){
+ this->handleBody(frame->getBody());
+ }else{
+ Channel* channel = channels[channelId];
+ if(channel == 0){
+ error(504, "Unknown channel");
+ }else{
+ try{
+ channel->handleBody(frame->getBody());
+ }catch(qpid::QpidError e){
+ channelException(channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ }
+ }
+ }
+}
+
+void Connection::handleMethod(AMQMethodBody::shared_ptr body){
+ //connection.close, basic.deliver, basic.return or a response to a synchronous request
+ if(responses.isWaiting()){
+ responses.signalResponse(body);
+ }else if(method_bodies.connection_close.match(body.get())){
+ //send back close ok
+ //close socket
+ ConnectionCloseBody* request = dynamic_cast<ConnectionCloseBody*>(body.get());
+ std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl;
+ connector->close();
+ }else{
+ std::cout << "Unhandled method for connection: " << *body << std::endl;
+ error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId());
+ }
+}
+
+void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){
+ error(504, "Channel error: received header body with channel 0.");
+}
+
+void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){
+ error(504, "Channel error: received content body with channel 0.");
+}
+
+void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
+}
+
+void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){
+ responses.expect();
+ out->send(frame);
+ responses.receive(body);
+}
+
+void Connection::error(int code, const string& msg, int classid, int methodid){
+ std::cout << "Connection exception generated: " << code << msg;
+ if(classid || methodid){
+ std::cout << " [" << methodid << ":" << classid << "]";
+ }
+ std::cout << std::endl;
+ sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok);
+ connector->close();
+}
+
+void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){
+ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl;
+ int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500;
+ string msg = e.msg;
+ if(method == 0){
+ closeChannel(channel, code, msg);
+ }else{
+ closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId());
+ }
+}
+
+void Connection::idleIn(){
+ std::cout << "Connection timed out due to abscence of heartbeat." << std::endl;
+ connector->close();
+}
+
+void Connection::idleOut(){
+ out->send(new AMQFrame(0, new AMQHeartbeatBody()));
+}
+
+void Connection::shutdown(){
+ closed = true;
+ //close all channels
+ for(iterator i = channels.begin(); i != channels.end(); i++){
+ i->second->stop();
+ }
+}