/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include #include #include #include #include using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid::sys; u_int16_t Connection::channelIdCounter; Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true), // AMQP version management change - kpvdr 2006-11-20 // TODO: Make this class version-aware and link these hard-wired numbers to that version version(8, 0) { connector = new Connector(debug, _max_frame_size); } Connection::~Connection(){ delete connector; } void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ host = _host; port = _port; connector->setInputHandler(this); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); connector->connect(host, port); ProtocolInitiation* header = new ProtocolInitiation(8, 0); responses.expect(); connector->init(header); responses.receive(method_bodies.connection_start); FieldTable props; string mechanism("PLAIN"); string response = ((char)0) + uid + ((char)0) + pwd; string locale("en_US"); responses.expect(); out->send(new AMQFrame(0, new ConnectionStartOkBody(version, props, mechanism, response, locale))); /** * Assume for now that further challenges will not be required //receive connection.secure responses.receive(connection_secure)); //send connection.secure-ok out->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ responses.receive(method_bodies.connection_tune); ConnectionTuneBody::shared_ptr proposal = boost::dynamic_pointer_cast(responses.getResponse()); out->send(new AMQFrame(0, new ConnectionTuneOkBody(version, proposal->getChannelMax(), max_frame_size, proposal->getHeartbeat()))); u_int16_t heartbeat = proposal->getHeartbeat(); connector->setReadTimeout(heartbeat * 2); connector->setWriteTimeout(heartbeat); //send connection.open string capabilities; string vhost = virtualhost; responses.expect(); out->send(new AMQFrame(0, new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now esp. as using force=true). responses.waitForResponse(); if(responses.validate(method_bodies.connection_open_ok)){ //ok }else if(responses.validate(method_bodies.connection_redirect)){ //ignore for now ConnectionRedirectBody::shared_ptr redirect(boost::dynamic_pointer_cast(responses.getResponse())); std::cout << "Received redirection to " << redirect->getHost() << std::endl; }else{ THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response"); } } void Connection::close(){ if(!closed){ u_int16_t code(200); string text("Ok"); u_int16_t classId(0); u_int16_t methodId(0); sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, text, classId, methodId)), method_bodies.connection_close_ok); connector->close(); } } void Connection::openChannel(Channel* channel){ channel->con = this; channel->id = ++channelIdCounter; channel->out = out; channels[channel->id] = channel; //now send frame to open channel and wait for response string oob; channel->sendAndReceive(new AMQFrame(channel->id, new ChannelOpenBody(version, oob)), method_bodies.channel_open_ok); channel->setQos(); channel->closed = false; } void Connection::closeChannel(Channel* channel){ //send frame to close channel u_int16_t code(200); string text("Ok"); u_int16_t classId(0); u_int16_t methodId(0); closeChannel(channel, code, text, classId, methodId); } void Connection::closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId, u_int16_t methodId){ //send frame to close channel channel->cancelAll(); channel->closed = true; channel->sendAndReceive(new AMQFrame(channel->id, new ChannelCloseBody(version, code, text, classId, methodId)), method_bodies.channel_close_ok); channel->con = 0; channel->out = 0; removeChannel(channel); } void Connection::removeChannel(Channel* channel){ //send frame to close channel channels.erase(channel->id); channel->out = 0; channel->id = 0; channel->con = 0; } void Connection::received(AMQFrame* frame){ u_int16_t channelId = frame->getChannel(); if(channelId == 0){ this->handleBody(frame->getBody()); }else{ Channel* channel = channels[channelId]; if(channel == 0){ error(504, "Unknown channel"); }else{ try{ channel->handleBody(frame->getBody()); }catch(qpid::QpidError e){ channelException(channel, dynamic_cast(frame->getBody().get()), e); } } } } void Connection::handleMethod(AMQMethodBody::shared_ptr body){ //connection.close, basic.deliver, basic.return or a response to a synchronous request if(responses.isWaiting()){ responses.signalResponse(body); }else if(method_bodies.connection_close.match(body.get())){ //send back close ok //close socket ConnectionCloseBody* request = dynamic_cast(body.get()); std::cout << "Connection closed by server: " << request->getReplyCode() << ":" << request->getReplyText() << std::endl; connector->close(); }else{ std::cout << "Unhandled method for connection: " << *body << std::endl; error(504, "Unrecognised method", body->amqpClassId(), body->amqpMethodId()); } } void Connection::handleHeader(AMQHeaderBody::shared_ptr /*body*/){ error(504, "Channel error: received header body with channel 0."); } void Connection::handleContent(AMQContentBody::shared_ptr /*body*/){ error(504, "Channel error: received content body with channel 0."); } void Connection::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Connection::sendAndReceive(AMQFrame* frame, const AMQMethodBody& body){ responses.expect(); out->send(frame); responses.receive(body); } void Connection::error(int code, const string& msg, int classid, int methodid){ std::cout << "Connection exception generated: " << code << msg; if(classid || methodid){ std::cout << " [" << methodid << ":" << classid << "]"; } std::cout << std::endl; sendAndReceive(new AMQFrame(0, new ConnectionCloseBody(version, code, msg, classid, methodid)), method_bodies.connection_close_ok); connector->close(); } void Connection::channelException(Channel* channel, AMQMethodBody* method, QpidError& e){ std::cout << "Caught error from channel [" << e.code << "] " << e.msg << " (" << e.location.file << ":" << e.location.line << ")" << std::endl; int code = e.code == PROTOCOL_ERROR ? e.code - PROTOCOL_ERROR : 500; string msg = e.msg; if(method == 0){ closeChannel(channel, code, msg); }else{ closeChannel(channel, code, msg, method->amqpClassId(), method->amqpMethodId()); } } void Connection::idleIn(){ std::cout << "Connection timed out due to abscence of heartbeat." << std::endl; connector->close(); } void Connection::idleOut(){ out->send(new AMQFrame(0, new AMQHeartbeatBody())); } void Connection::shutdown(){ closed = true; //close all channels for(iterator i = channels.begin(); i != channels.end(); i++){ i->second->stop(); } }