/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Connection.h" #include "Session.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" #include "qpid/sys/OutputControl.h" #include extern "C" { #include #include } namespace qpid { namespace broker { namespace amqp { Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse) : ManagedConnection(b, i), connection(pn_connection()), transport(pn_transport()), out(o), id(i), broker(b), haveOutput(true) { if (pn_transport_bind(transport, connection)) { //error } out.activateOutput(); bool enableTrace(false); QPID_LOG_TEST_CAT(trace, protocol, enableTrace); if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM); if (!saslInUse) { //feed in a dummy AMQP 1.0 header as engine expects one, but //we already read it (if sasl is in use we read the sasl //header,not the AMQP 1.0 header). std::vector protocolHeader(8); qpid::framing::ProtocolInitiation pi(getVersion()); qpid::framing::Buffer buffer(&protocolHeader[0], protocolHeader.size()); pi.encode(buffer); pn_transport_input(transport, &protocolHeader[0], protocolHeader.size()); //wont get a userid, so set a dummy one on the ManagedConnection to trigger event setUserid("no authentication used"); } } Connection::~Connection() { pn_transport_free(transport); pn_connection_free(connection); } pn_transport_t* Connection::getTransport() { return transport; } size_t Connection::decode(const char* buffer, size_t size) { QPID_LOG(trace, id << " decode(" << size << ")") //TODO: Fix pn_engine_input() to take const buffer ssize_t n = pn_transport_input(transport, const_cast(buffer), size); if (n > 0 || n == PN_EOS) { //If engine returns EOS, have no way of knowing how many bytes //it processed, but can assume none need to be reprocessed so //consider them all read: if (n == PN_EOS) n = size; QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) process(); pn_transport_tick(transport, 0); if (!haveOutput) { haveOutput = true; out.activateOutput(); } return n; } else if (n == PN_ERR) { throw qpid::Exception(QPID_MSG("Error on input: " << getError())); } else { return 0; } } size_t Connection::encode(char* buffer, size_t size) { QPID_LOG(trace, "encode(" << size << ")") ssize_t n = pn_transport_output(transport, buffer, size); if (n > 0) { QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; return n; } else if (n == PN_EOS) { haveOutput = size; return size;//Is this right? } else if (n == PN_ERR) { throw qpid::Exception(QPID_MSG("Error on output: " << getError())); } else { haveOutput = false; return 0; } } bool Connection::canEncode() { for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { if (i->second->dispatch()) haveOutput = true; } process(); //TODO: proper handling of time in and out of tick pn_transport_tick(transport, 0); QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) return haveOutput; } void Connection::closed() { //TODO: tear down sessions and associated links for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { i->second->close(); } } bool Connection::isClosed() const { return pn_connection_state(connection) & PN_REMOTE_CLOSED; } framing::ProtocolVersion Connection::getVersion() const { return qpid::framing::ProtocolVersion(1,0); } namespace { pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; } void Connection::process() { QPID_LOG(trace, id << " process()"); if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { QPID_LOG_CAT(debug, model, id << " connection opened"); pn_connection_set_container(connection, broker.getFederationTag().c_str()); pn_connection_open(connection); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { QPID_LOG_CAT(debug, model, id << " session begun"); pn_session_open(s); boost::shared_ptr ssn(new Session(s, broker, *this, out)); sessions[s] = ssn; } for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { pn_link_open(l); Sessions::iterator session = sessions.find(pn_link_session(l)); if (session == sessions.end()) { QPID_LOG(error, id << " Link attached on unknown session!"); } else { try { session->second->attach(l); QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); } catch (const std::exception& e) { QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); //TODO: set error details on detach when that is exposed via engine API pn_link_close(l); } } } //handle deliveries for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { pn_link_t* link = pn_delivery_link(delivery); if (pn_link_is_receiver(link)) { Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { i->second->incoming(link, delivery); } else { pn_delivery_update(delivery, PN_REJECTED); } } else { //i.e. SENDER Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); i->second->outgoing(link, delivery); } else { QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); } } } for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { pn_link_close(l); Sessions::iterator session = sessions.find(pn_link_session(l)); if (session == sessions.end()) { QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); } else { session->second->detach(l); QPID_LOG_CAT(debug, model, id << " link detached"); } } for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { pn_session_close(s); Sessions::iterator i = sessions.find(s); if (i != sessions.end()) { i->second->close(); sessions.erase(i); QPID_LOG_CAT(debug, model, id << " session ended"); } else { QPID_LOG(error, id << " peer attempted to close unrecognised session"); } } if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { QPID_LOG_CAT(debug, model, id << " connection closed"); pn_connection_close(connection); } } std::string Connection::getError() { std::stringstream text; pn_error_t* cerror = pn_connection_error(connection); if (cerror) text << "connection error " << pn_error_text(cerror); pn_error_t* terror = pn_transport_error(transport); if (terror) text << "transport error " << pn_error_text(terror); return text.str(); } }}} // namespace qpid::broker::amqp