/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ /** * This class provides an example of using AMQP for a request-response * style system. 'Requests' are messages sent to a well known * destination. A 'service' process consumes these message and * responds by echoing the message back to the sender on a * sender-specified private queue. */ #include #include #include #include #include #include #include #include #include using namespace qpid::client; using namespace qpid::sys; using std::string; /** * A message listener implementation representing the 'service', this * will 'echo' any requests received. */ class EchoServer : public MessageListener{ Channel* const channel; public: EchoServer(Channel* channel); virtual void received(Message& msg); }; /** * A message listener implementation that merely prints received * messages to the console. Used to report on 'echo' responses. */ class LoggingListener : public MessageListener{ public: virtual void received(Message& msg); }; /** * A utility class that manages the command line options needed to run * the example confirgurably. */ class Args{ string host; int port; bool trace; bool help; bool client; public: inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){} void parse(int argc, char** argv); void usage(); inline const string& getHost() const { return host;} inline int getPort() const { return port; } inline bool getTrace() const { return trace; } inline bool getHelp() const { return help; } inline bool getClient() const { return client; } }; /** * The main test path. There are two basic modes: 'client' and * 'service'. First one or more services are started, then one or more * clients are started and messages can be sent. */ int main(int argc, char** argv){ const std::string echo_service("echo_service"); Args args; args.parse(argc, argv); if (args.getHelp()) { args.usage(); } else if (args.getClient()) { //we have been started in 'client' mode, i.e. we will send an //echo requests and print responses received. try { //Create connection & open a channel Connection connection(args.getTrace()); connection.open(args.getHost(), args.getPort()); Channel channel; connection.openChannel(&channel); //Setup: declare the private 'response' queue and bind it //to the direct exchange by its name which will be //generated by the server Queue response; channel.declareQueue(response); qpid::framing::FieldTable emptyArgs; channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, response, response.getName(), emptyArgs); //Consume from the response queue, logging all echoed message to console: LoggingListener listener; std::string tag; channel.consume(response, tag, &listener); //Process incoming requests on a new thread channel.start(); //get messages from console and send them: std::string text; std::cout << "Enter text to send:" << std::endl; while (std::getline(std::cin, text)) { std::cout << "Sending " << text << " to echo server." << std::endl; Message msg; msg.getHeaders().setString("RESPONSE_QUEUE", response.getName()); msg.setData(text); channel.publish(msg, Exchange::STANDARD_DIRECT_EXCHANGE, echo_service); std::cout << "Enter text to send:" << std::endl; } connection.close(); } catch(qpid::QpidError error) { std::cout << error.what() << std::endl; } } else { // we are in 'service' mode, i.e. we will consume messages // from the request queue and echo each request back to the // senders own private response queue. try { //Create connection & open a channel Connection connection(args.getTrace()); connection.open(args.getHost(), args.getPort()); Channel channel; connection.openChannel(&channel); //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name Queue request("request"); channel.declareQueue(request); qpid::framing::FieldTable emptyArgs; channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, request, echo_service, emptyArgs); //Consume from the request queue, echoing back all messages received to the client that sent them EchoServer server(&channel); std::string tag = "server_tag"; channel.consume(request, tag, &server); //Process incoming requests on the main thread channel.run(); connection.close(); } catch(qpid::QpidError error) { std::cout << error.what() << std::endl; } } } EchoServer::EchoServer(Channel* _channel) : channel(_channel){} void EchoServer::received(Message& message) { //get name of response queues binding to the default direct exchange: const std::string name = message.getHeaders().getString("RESPONSE_QUEUE"); if (name.empty()) { std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl; } else { //print message to console: std::cout << "Echoing " << message.getData() << " back to " << name << std::endl; //'echo' the message back: channel->publish(message, Exchange::STANDARD_DIRECT_EXCHANGE, name); } } void LoggingListener::received(Message& message) { //print message to console: std::cout << "Received echo: " << message.getData() << std::endl; } void Args::parse(int argc, char** argv){ for(int i = 1; i < argc; i++){ string name(argv[i]); if("-help" == name){ help = true; break; }else if("-host" == name){ host = argv[++i]; }else if("-port" == name){ port = atoi(argv[++i]); }else if("-trace" == name){ trace = true; }else if("-client" == name){ client = true; }else{ std::cout << "Warning: unrecognised option " << name << std::endl; } } } void Args::usage(){ std::cout << "Options:" << std::endl; std::cout << " -help" << std::endl; std::cout << " Prints this usage message" << std::endl; std::cout << " -host " << std::endl; std::cout << " Specifies host to connect to (default is localhost)" << std::endl; std::cout << " -port " << std::endl; std::cout << " Specifies port to conect to (default is 5762)" << std::endl; std::cout << " -trace" << std::endl; std::cout << " Indicates that the frames sent and received should be logged" << std::endl; std::cout << " -client" << std::endl; std::cout << " Run as a client (else will run as a server)" << std::endl; }