/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/FedOps.h" #include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" #include "qpid/types/Variant.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include using qpid::framing::FieldTable; using qpid::framing::Uuid; using qpid::framing::Buffer; using qpid::framing::AMQFrame; using qpid::framing::AMQContentBody; using qpid::framing::AMQHeaderBody; using qpid::framing::MessageProperties; using qpid::framing::MessageTransferBody; using qpid::types::Variant; using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; namespace { const std::string QPID_REPLICATE("qpid.replicate"); const std::string NONE("none"); const uint8_t EXPLICIT_ACK(0); // msg.accept required to be sent const uint8_t IMPLIED_ACK(1); // msg.accept assumed, not sent } namespace qpid { namespace broker { void Bridge::PushHandler::handle(framing::AMQFrame& frame) { conn->received(frame); } Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init, const std::string& _queueName, const string& ae) : link(_link), channel(_id), args(_args), listener(l), name(_name), queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() : _queueName), altEx(ae), persistenceId(0), conn(0), initialize(init), detached(false), useExistingQueue(!_queueName.empty()), sessionName("qpid.bridge_session_" + name + "_" + link->getBroker()->getFederationTag()) { // If both acks (i_sync) and limited credit is configured, then we'd // better be able to sync before running out of credit or we // may stall (note: i_credit==0 means "unlimited") if (args.i_credit && args.i_sync && args.i_sync > args.i_credit) throw Exception("The credit value must be greater than configured sync (ack) interval."); ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync, args.i_credit)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); } Bridge::~Bridge() { mgmtObject->resourceDestroy(); } void Bridge::create(amqp_0_10::Connection& c) { detached = false; // Reset detached in case we are recovering. conn = &c; SessionHandler& sessionHandler = c.getChannel(channel); sessionHandler.setErrorListener(shared_from_this()); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); // Point the bridging commands at the local connection handler pushHandler.reset(new PushHandler(&c)); channelHandler.reset(new framing::ChannelHandler(channel, pushHandler.get())); session.reset(new framing::AMQP_ServerProxy::Session(*channelHandler)); peer.reset(new framing::AMQP_ServerProxy(*channelHandler)); session->attach(sessionName, false); session->commandPoint(0,0); } else { sessionHandler.attachAs(sessionName); // Point the bridging commands at the remote peer broker peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); } if (args.i_srcIsLocal) sessionHandler.getSession()->disableReceiverTracking(); if (initialize) { initialize(*this, sessionHandler); // custom subscription initializer supplied } else { // will a temp queue be created for this bridge? const bool temp_queue = !args.i_srcIsQueue && !useExistingQueue; // UI convention: user specifies 0 for infinite credit const uint32_t credit = (args.i_credit == 0) ? LinkRegistry::INFINITE_CREDIT : args.i_credit; // use explicit acks only for non-temp queues, useless for temp queues since they are // destroyed when the session drops (can't resend unacked msgs) const uint8_t ack_mode = (args.i_sync && !temp_queue) ? EXPLICIT_ACK : IMPLIED_ACK; // configure command.sync frequency FieldTable options; uint32_t freq = 0; if (ack_mode == EXPLICIT_ACK) { // user explicitly configured syncs freq = uint32_t(args.i_sync); } else if (credit && credit != LinkRegistry::INFINITE_CREDIT) { // force occasional sync to keep from stalling due to lack of credit freq = (credit + 1)/2; } if (freq) options.setInt("qpid.sync_frequency", freq); // create a subscription on the remote if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, ack_mode, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, credit); // message credit peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); // byte credit QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); } else { if (!useExistingQueue) { FieldTable queueSettings; if (args.i_tag.size()) { queueSettings.setString("qpid.trace.id", args.i_tag); } else { const string& peerTag = c.getFederationPeerTag(); if (peerTag.size()) queueSettings.setString("qpid.trace.id", peerTag); } if (args.i_excludes.size()) { queueSettings.setString("qpid.trace.exclude", args.i_excludes); } else { const string& localTag = link->getBroker()->getFederationTag(); if (localTag.size()) queueSettings.setString("qpid.trace.exclude", localTag); } bool durable = false;//should this be an arg, or would we use srcIsQueue for durable queues? bool exclusive = true; // only exclusive if the queue is owned by the bridge bool autoDelete = exclusive && !durable;//auto delete transient queues? peer->getQueue().declare(queueName, altEx, false, durable, exclusive, autoDelete, queueSettings); } if (!args.i_dynamic) peer->getExchange().bind(queueName, args.i_src, args.i_key, FieldTable()); peer->getMessage().subscribe(queueName, args.i_dest, ack_mode, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, credit); peer->getMessage().flow(args.i_dest, 1, LinkRegistry::INFINITE_CREDIT); if (args.i_dynamic) { Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); exchange->registerDynamicBridge(this); QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } } } if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); } void Bridge::cancel(amqp_0_10::Connection& c) { // If &c != conn then we have failed over so the old connection is closed. if (&c == conn && resetProxy()) { peer->getMessage().cancel(args.i_dest); peer->getSession().detach(sessionName); } QPID_LOG(debug, "Cancelled bridge " << name); } /** Notify the bridge that the connection has closed */ void Bridge::closed() { if (args.i_dynamic) { Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src); if (exchange.get()) exchange->removeDynamicBridge(this); } QPID_LOG(debug, "Closed bridge " << name); } /** Shut down the bridge */ void Bridge::close() { listener(this); // ask the LinkRegistry to destroy us } void Bridge::setPersistenceId(uint64_t pId) const { persistenceId = pId; } const std::string Bridge::ENCODED_IDENTIFIER("bridge.v2"); const std::string Bridge::ENCODED_IDENTIFIER_V1("bridge"); bool Bridge::isEncodedBridge(const std::string& key) { return key == ENCODED_IDENTIFIER || key == ENCODED_IDENTIFIER_V1; } Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer) { string kind; buffer.getShortString(kind); string host; uint16_t port; string src; string dest; string key; string id; string excludes; string name; Link::shared_ptr link; if (kind == ENCODED_IDENTIFIER_V1) { /** previous versions identified the bridge by host:port, not by name, and * transport wasn't provided. Try to find a link using those paramters. */ buffer.getShortString(host); port = buffer.getShort(); link = links.getLink(host, port); if (!link) { QPID_LOG(error, "Bridge::decode() failed: cannot find Link for host=" << host << ", port=" << port); return Bridge::shared_ptr(); } } else { string linkName; buffer.getShortString(name); buffer.getShortString(linkName); link = links.getLink(linkName); if (!link) { QPID_LOG(error, "Bridge::decode() failed: cannot find Link named='" << linkName << "'"); return Bridge::shared_ptr(); } } bool durable(buffer.getOctet()); buffer.getShortString(src); buffer.getShortString(dest); buffer.getShortString(key); bool is_queue(buffer.getOctet()); bool is_local(buffer.getOctet()); buffer.getShortString(id); buffer.getShortString(excludes); bool dynamic(buffer.getOctet()); uint16_t sync = buffer.getShort(); uint32_t credit = buffer.getLong(); if (kind == ENCODED_IDENTIFIER_V1) { /** previous versions did not provide a name for the bridge, so create one */ name = createName(link->getName(), src, dest, key); } return links.declare(name, *link, durable, src, dest, key, is_queue, is_local, id, excludes, dynamic, sync, credit).first; } void Bridge::encode(Buffer& buffer) const { buffer.putShortString(ENCODED_IDENTIFIER); buffer.putShortString(name); buffer.putShortString(link->getName()); buffer.putOctet(args.i_durable ? 1 : 0); buffer.putShortString(args.i_src); buffer.putShortString(args.i_dest); buffer.putShortString(args.i_key); buffer.putOctet(args.i_srcIsQueue ? 1 : 0); buffer.putOctet(args.i_srcIsLocal ? 1 : 0); buffer.putShortString(args.i_tag); buffer.putShortString(args.i_excludes); buffer.putOctet(args.i_dynamic ? 1 : 0); buffer.putShort(args.i_sync); buffer.putLong(args.i_credit); } uint32_t Bridge::encodedSize() const { return ENCODED_IDENTIFIER.size() + 1 // +1 byte length + name.size() + 1 + link->getName().size() + 1 + 1 // durable + args.i_src.size() + 1 + args.i_dest.size() + 1 + args.i_key.size() + 1 + 1 // srcIsQueue + 1 // srcIsLocal + args.i_tag.size() + 1 + args.i_excludes.size() + 1 + 1 // dynamic + 2 // sync + 4; // credit } management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const { return mgmtObject; } management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, management::Args& /*args*/, string&) { if (methodId == _qmf::Bridge::METHOD_CLOSE) { //notify that we are closed QPID_LOG(debug, "Bridge::close() method called on bridge '" << name << "'"); close(); return management::Manageable::STATUS_OK; } else { return management::Manageable::STATUS_UNKNOWN_METHOD; } } void Bridge::propagateBinding(const string& key, const string& tagList, const string& op, const string& origin, qpid::framing::FieldTable* extra_args) { const string& localTag = link->getBroker()->getFederationTag(); const string& peerTag = conn->getFederationPeerTag(); if (tagList.find(peerTag) == tagList.npos) { FieldTable bindArgs; if (extra_args) { for (qpid::framing::FieldTable::ValueMap::iterator i=extra_args->begin(); i != extra_args->end(); ++i) { bindArgs.insert((*i)); } } string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); bindArgs.setString(QPID_REPLICATE, NONE); bindArgs.setString(qpidFedOp, op); bindArgs.setString(qpidFedTags, newTagList); if (origin.empty()) bindArgs.setString(qpidFedOrigin, localTag); else bindArgs.setString(qpidFedOrigin, origin); conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, key, bindArgs)); } } void Bridge::sendReorigin() { FieldTable bindArgs; bindArgs.setString(qpidFedOp, fedOpReorigin); bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag()); conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this, queueName, args.i_src, args.i_key, bindArgs)); } bool Bridge::resetProxy() { SessionHandler& sessionHandler = conn->getChannel(channel); if (!sessionHandler.getSession()) peer.reset(); else peer.reset(new framing::AMQP_ServerProxy(sessionHandler.out)); return peer.get(); } void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args) { if (resetProxy()) { peer->getExchange().bind(queue, exchange, key, args); } else { // link's periodic maintenance visit will attempt to recover } } bool Bridge::containsLocalTag(const string& tagList) const { const string& localTag = link->getBroker()->getFederationTag(); return (tagList.find(localTag) != tagList.npos); } const string& Bridge::getLocalTag() const { return link->getBroker()->getFederationTag(); } // SessionHandler::ErrorListener methods. void Bridge::connectionException( framing::connection::CloseCode code, const std::string& msg) { if (errorListener) errorListener->connectionException(code, msg); } void Bridge::channelException( framing::session::DetachCode code, const std::string& msg) { if (errorListener) errorListener->channelException(code, msg); } void Bridge::executionException( framing::execution::ErrorCode code, const std::string& msg) { if (errorListener) errorListener->executionException(code, msg); } void Bridge::incomingExecutionException( framing::execution::ErrorCode code, const std::string& msg) { if (errorListener) errorListener->incomingExecutionException(code, msg); } void Bridge::detach() { detached = true; if (errorListener) errorListener->detach(); } std::string Bridge::createName(const std::string& linkName, const std::string& src, const std::string& dest, const std::string& key) { std::stringstream keystream; keystream << linkName << "!" << src << "!" << dest << "!" << key; return keystream.str(); } }}