diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-07-24 00:35:26 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-07-24 00:35:26 +0000 |
| commit | 586e63b99de7711689b0728d7a0c20354256c8dc (patch) | |
| tree | 0559ed6348a880f8ce979a951eaff56ecab62d19 /java/newclient/src | |
| parent | 42238d6f0a49bd9311229752c07278329b90e05c (diff) | |
| download | qpid-python-586e63b99de7711689b0728d7a0c20354256c8dc.tar.gz | |
adding synapse exchange
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@558903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/newclient/src')
5 files changed, 224 insertions, 69 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java index bde2a7bfea..9aea71f3bb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java @@ -27,24 +27,13 @@ public class QpidTestClient QpidExchangeHelper exchangeHelper = session.getExchangeHelper(); exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS); - QpidQueueHelper queueHelper = session.getQueueHelper(); - queueHelper.declareQueue(false, false, false, false, false, "myQueue"); - queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH"); + exchangeHelper.declareExchange(false, false, QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, QpidConstants.SYNAPSE_EXCHANGE_CLASS); + + //contentBasedRoutingSample(session); - MessageHeaders msgHeaders = new MessageHeaders(); - msgHeaders.setRoutingKey(new AMQShortString("RH")); - msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME)); - AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes()); + //transformationSample2(session); - QpidMessageProducer messageProducer = session.createProducer(); - messageProducer.open(); - messageProducer.send(false, true, msg); - - QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false); - messageConsumer.open(); - - AMQPApplicationMessage msg2 = messageConsumer.receive(); - System.out.println(msg.toString()); + binaryMessageTransformations(session); } catch(Exception e) { @@ -53,4 +42,159 @@ public class QpidTestClient } + public static void contentBasedRoutingSample(QpidSession session) throws Exception + { + String tmp = "<m:troubleTicket xmlns:m=\"http://redhat.com/sample\"><m:customerId>532535</m:customerId><m:priority>"; + String tmp2 = "</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>"; + + //Create queues + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "criticalTicketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "criticalTicketQueue", "criticalTicket"); + + queueHelper.declareQueue(false, false, false, false, false, "lowPriorityTicketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "lowPriorityTicketQueue", "lowPriorityTicket"); + + queueHelper.declareQueue(false, false, false, false, false, "ticketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "ticketQueue", "troubleTicket"); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("defectSystem")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + StringBuffer buf = new StringBuffer(); + buf.append(tmp).append("critical").append(tmp2); + AMQPApplicationMessage criticalMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(criticalMsg.toString()); + messageProducer.send(false, true, criticalMsg); + + buf = new StringBuffer(); + buf.append(tmp).append("low").append(tmp2); + AMQPApplicationMessage lowMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(lowMsg.toString()); + messageProducer.send(false, true, lowMsg); + + buf = new StringBuffer(); + buf.append(tmp).append("high").append(tmp2); + AMQPApplicationMessage highMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(highMsg.toString()); + messageProducer.send(false, true, highMsg); + + QpidMessageConsumer messageConsumerCritical = session.createConsumer("criticalTicketQueue", false, false); + messageConsumerCritical.open(); + AMQPApplicationMessage criticalMsgRcv = messageConsumerCritical.receive(); + System.out.println(criticalMsgRcv.toString()); + + QpidMessageConsumer messageConsumerLow = session.createConsumer("lowPriorityTicketQueue", false, false); + messageConsumerLow.open(); + AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive(); + System.out.println(lowMsgRcv.toString()); + + QpidMessageConsumer messageConsumer = session.createConsumer("ticketQueue", false, false); + messageConsumer.open(); + AMQPApplicationMessage msgRcv = messageConsumer.receive(); + System.out.println(msgRcv.toString()); + + } + + public static void transformationSample(QpidSession session) throws Exception + { + String tmp = "<m:quote xmlns:m=\"http://redhat.com/sample\"><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>"; + + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "stockQuoteQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "stockQuoteQueue", "stockQuote"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("stockQuote")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,tmp.getBytes()); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + + public static void transformationSample2(QpidSession session) throws Exception + { + StringBuffer buf = new StringBuffer(); + buf.append("<m:vacationPackage xmlns:m=\"http://redhat.com/sample\">"); + buf.append("<m:customerFirstName>Rajith</m:customerFirstName>"); + buf.append("<m:customerLastName>Rajith</m:customerLastName>"); + buf.append("<m:customerAddress>3349 Missississauga Road,Mississauga,ON, L5L 1J7</m:customerAddress>"); + buf.append("<m:customerDOB>Mississauga</m:customerDOB>"); + buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>"); + buf.append("<m:start>12072007</m:start>"); + buf.append("<m:end>18072007</m:end>"); + buf.append("<m:airTicket>"); + buf.append("<m:airline>AC</m:airline>"); + buf.append("<m:seatPreference>W</m:seatPreference>"); + buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>"); + buf.append("</m:airTicket>"); + buf.append("<m:Hotel>"); + buf.append("<m:noOfDays>5</m:noOfDays>"); + buf.append("<m:rating>5</m:rating>"); + buf.append("<m:meals>AI</m:meals>"); + buf.append("</m:Hotel>"); + buf.append("<m:carRental>"); + buf.append("<m:from>14062007</m:from>"); + buf.append("<m:to>16062007</m:to>"); + buf.append("</m:carRental>"); + buf.append("</m:vacationPackage>"); + + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "carRentalQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "carRentalQueue", "carRental"); + + queueHelper.declareQueue(false, false, false, false, false, "hotelQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "hotelQueue", "hotel"); + + queueHelper.declareQueue(false, false, false, false, false, "airlineQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "airlineQueue", "airline"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("vacationPackage")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + + public static void binaryMessageTransformations(QpidSession session) throws Exception + { + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "binaryQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "binaryQueue", "binary"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("application/octet-stream")); + msgHeaders.setRoutingKey(new AMQShortString("binary")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + byte[] buf = new byte[]{72,101,108,108,111}; + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java index 96103509cf..5001d50548 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java @@ -22,6 +22,10 @@ public class QpidConstants public final static String FANOUT_EXCHANGE_CLASS = "fanout"; + public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse"; + + public final static String SYNAPSE_EXCHANGE_CLASS = "synapse"; + public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt"; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java index f14825ed31..5725de9caa 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -8,56 +8,56 @@ import org.apache.qpid.nclient.config.ClientConfiguration; public class PhaseFactory { - /** - * This method will create the pipe and return a reference - * to the top of the pipeline. - * - * The application can then use this (top most) phase and all - * calls will propogated down the pipe. - * - * Simillar calls orginating at the bottom of the pipeline - * will be propogated to the top. - * - * @param ctx - * @return - * @throws AMQPException - */ - public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException - { - String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE; - Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); - List<String> list = ClientConfiguration.get().getList(key); - int index = 0; - for(String s:list) + /** + * This method will create the pipe and return a reference + * to the top of the pipeline. + * + * The application can then use this (top most) phase and all + * calls will propogated down the pipe. + * + * Simillar calls orginating at the bottom of the pipeline + * will be propogated to the top. + * + * @param ctx + * @return + * @throws AMQPException + */ + public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException { - try - { - Phase temp = (Phase)Class.forName(s).newInstance(); - phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ; - } - catch(Exception e) - { - throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); - } - index++; - } - - Phase current = null; - Phase prev = null; - Phase next = null; - //Lets build the phase pipe. - for (int i=0; i<phaseMap.size();i++) - { - current = phaseMap.get(i); - if (i+1 < phaseMap.size()) - { - next = phaseMap.get(i+1); - } - current.init(ctx, next, prev); - prev = current; - next = null; + String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE; + Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>(); + List<String> list = ClientConfiguration.get().getList(key); + int index = 0; + for (String s : list) + { + try + { + Phase temp = (Phase) Class.forName(s).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX), temp); + } + catch (Exception e) + { + throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s), e); + } + index++; + } + + Phase current = null; + Phase prev = null; + Phase next = null; + //Lets build the phase pipe. + for (int i = 0; i < phaseMap.size(); i++) + { + current = phaseMap.get(i); + if (i + 1 < phaseMap.size()) + { + next = phaseMap.get(i + 1); + } + current.init(ctx, next, prev); + prev = current; + next = null; + } + + return current; } - - return current; - } }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java index fe0231522c..57ce513267 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java @@ -70,7 +70,14 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes public AMQPApplicationMessage receive()throws QpidException { checkClosed(); - return _queue.poll(); + try + { + return _queue.take(); + } + catch (InterruptedException e) + { + throw new QpidException("Error occurred while retrieving message",e); + } } public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java index 91298dcc57..c20a76a247 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java @@ -82,8 +82,8 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes msgHeaders.getContentType(), //contentType msgHeaders.getCorrelationId(), //correlationId msgHeaders.getDeliveryMode(), //deliveryMode non persistant - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + msgHeaders.getDestination(),// destination + msgHeaders.getExchange(),// exchange msgHeaders.getExpiration(), //expiration msgHeaders.isImmediate(), //immediate msgHeaders.isMandatory(), //mandatory |
