summaryrefslogtreecommitdiff
path: root/java/newclient/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-07-24 00:35:26 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-07-24 00:35:26 +0000
commit586e63b99de7711689b0728d7a0c20354256c8dc (patch)
tree0559ed6348a880f8ce979a951eaff56ecab62d19 /java/newclient/src
parent42238d6f0a49bd9311229752c07278329b90e05c (diff)
downloadqpid-python-586e63b99de7711689b0728d7a0c20354256c8dc.tar.gz
adding synapse exchange
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@558903 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/newclient/src')
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java176
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java100
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java9
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java4
5 files changed, 224 insertions, 69 deletions
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
index bde2a7bfea..9aea71f3bb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
@@ -27,24 +27,13 @@ public class QpidTestClient
QpidExchangeHelper exchangeHelper = session.getExchangeHelper();
exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS);
- QpidQueueHelper queueHelper = session.getQueueHelper();
- queueHelper.declareQueue(false, false, false, false, false, "myQueue");
- queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH");
+ exchangeHelper.declareExchange(false, false, QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, QpidConstants.SYNAPSE_EXCHANGE_CLASS);
+
+ //contentBasedRoutingSample(session);
- MessageHeaders msgHeaders = new MessageHeaders();
- msgHeaders.setRoutingKey(new AMQShortString("RH"));
- msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
- AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes());
+ //transformationSample2(session);
- QpidMessageProducer messageProducer = session.createProducer();
- messageProducer.open();
- messageProducer.send(false, true, msg);
-
- QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false);
- messageConsumer.open();
-
- AMQPApplicationMessage msg2 = messageConsumer.receive();
- System.out.println(msg.toString());
+ binaryMessageTransformations(session);
}
catch(Exception e)
{
@@ -53,4 +42,159 @@ public class QpidTestClient
}
+ public static void contentBasedRoutingSample(QpidSession session) throws Exception
+ {
+ String tmp = "<m:troubleTicket xmlns:m=\"http://redhat.com/sample\"><m:customerId>532535</m:customerId><m:priority>";
+ String tmp2 = "</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>";
+
+ //Create queues
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "criticalTicketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "criticalTicketQueue", "criticalTicket");
+
+ queueHelper.declareQueue(false, false, false, false, false, "lowPriorityTicketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "lowPriorityTicketQueue", "lowPriorityTicket");
+
+ queueHelper.declareQueue(false, false, false, false, false, "ticketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "ticketQueue", "troubleTicket");
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("defectSystem"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ StringBuffer buf = new StringBuffer();
+ buf.append(tmp).append("critical").append(tmp2);
+ AMQPApplicationMessage criticalMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(criticalMsg.toString());
+ messageProducer.send(false, true, criticalMsg);
+
+ buf = new StringBuffer();
+ buf.append(tmp).append("low").append(tmp2);
+ AMQPApplicationMessage lowMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(lowMsg.toString());
+ messageProducer.send(false, true, lowMsg);
+
+ buf = new StringBuffer();
+ buf.append(tmp).append("high").append(tmp2);
+ AMQPApplicationMessage highMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(highMsg.toString());
+ messageProducer.send(false, true, highMsg);
+
+ QpidMessageConsumer messageConsumerCritical = session.createConsumer("criticalTicketQueue", false, false);
+ messageConsumerCritical.open();
+ AMQPApplicationMessage criticalMsgRcv = messageConsumerCritical.receive();
+ System.out.println(criticalMsgRcv.toString());
+
+ QpidMessageConsumer messageConsumerLow = session.createConsumer("lowPriorityTicketQueue", false, false);
+ messageConsumerLow.open();
+ AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive();
+ System.out.println(lowMsgRcv.toString());
+
+ QpidMessageConsumer messageConsumer = session.createConsumer("ticketQueue", false, false);
+ messageConsumer.open();
+ AMQPApplicationMessage msgRcv = messageConsumer.receive();
+ System.out.println(msgRcv.toString());
+
+ }
+
+ public static void transformationSample(QpidSession session) throws Exception
+ {
+ String tmp = "<m:quote xmlns:m=\"http://redhat.com/sample\"><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>";
+
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "stockQuoteQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "stockQuoteQueue", "stockQuote");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("stockQuote"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,tmp.getBytes());
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
+ public static void transformationSample2(QpidSession session) throws Exception
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("<m:vacationPackage xmlns:m=\"http://redhat.com/sample\">");
+ buf.append("<m:customerFirstName>Rajith</m:customerFirstName>");
+ buf.append("<m:customerLastName>Rajith</m:customerLastName>");
+ buf.append("<m:customerAddress>3349 Missississauga Road,Mississauga,ON, L5L 1J7</m:customerAddress>");
+ buf.append("<m:customerDOB>Mississauga</m:customerDOB>");
+ buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>");
+ buf.append("<m:start>12072007</m:start>");
+ buf.append("<m:end>18072007</m:end>");
+ buf.append("<m:airTicket>");
+ buf.append("<m:airline>AC</m:airline>");
+ buf.append("<m:seatPreference>W</m:seatPreference>");
+ buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>");
+ buf.append("</m:airTicket>");
+ buf.append("<m:Hotel>");
+ buf.append("<m:noOfDays>5</m:noOfDays>");
+ buf.append("<m:rating>5</m:rating>");
+ buf.append("<m:meals>AI</m:meals>");
+ buf.append("</m:Hotel>");
+ buf.append("<m:carRental>");
+ buf.append("<m:from>14062007</m:from>");
+ buf.append("<m:to>16062007</m:to>");
+ buf.append("</m:carRental>");
+ buf.append("</m:vacationPackage>");
+
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "carRentalQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "carRentalQueue", "carRental");
+
+ queueHelper.declareQueue(false, false, false, false, false, "hotelQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "hotelQueue", "hotel");
+
+ queueHelper.declareQueue(false, false, false, false, false, "airlineQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "airlineQueue", "airline");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("vacationPackage"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
+ public static void binaryMessageTransformations(QpidSession session) throws Exception
+ {
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "binaryQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "binaryQueue", "binary");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("application/octet-stream"));
+ msgHeaders.setRoutingKey(new AMQShortString("binary"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ byte[] buf = new byte[]{72,101,108,108,111};
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf);
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
index 96103509cf..5001d50548 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
@@ -22,6 +22,10 @@ public class QpidConstants
public final static String FANOUT_EXCHANGE_CLASS = "fanout";
+ public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse";
+
+ public final static String SYNAPSE_EXCHANGE_CLASS = "synapse";
+
public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt";
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
index f14825ed31..5725de9caa 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -8,56 +8,56 @@ import org.apache.qpid.nclient.config.ClientConfiguration;
public class PhaseFactory
{
- /**
- * This method will create the pipe and return a reference
- * to the top of the pipeline.
- *
- * The application can then use this (top most) phase and all
- * calls will propogated down the pipe.
- *
- * Simillar calls orginating at the bottom of the pipeline
- * will be propogated to the top.
- *
- * @param ctx
- * @return
- * @throws AMQPException
- */
- public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
- {
- String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
- Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
- List<String> list = ClientConfiguration.get().getList(key);
- int index = 0;
- for(String s:list)
+ /**
+ * This method will create the pipe and return a reference
+ * to the top of the pipeline.
+ *
+ * The application can then use this (top most) phase and all
+ * calls will propogated down the pipe.
+ *
+ * Simillar calls orginating at the bottom of the pipeline
+ * will be propogated to the top.
+ *
+ * @param ctx
+ * @return
+ * @throws AMQPException
+ */
+ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
{
- try
- {
- Phase temp = (Phase)Class.forName(s).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ;
- }
- catch(Exception e)
- {
- throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
- }
- index++;
- }
-
- Phase current = null;
- Phase prev = null;
- Phase next = null;
- //Lets build the phase pipe.
- for (int i=0; i<phaseMap.size();i++)
- {
- current = phaseMap.get(i);
- if (i+1 < phaseMap.size())
- {
- next = phaseMap.get(i+1);
- }
- current.init(ctx, next, prev);
- prev = current;
- next = null;
+ String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
+ Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>();
+ List<String> list = ClientConfiguration.get().getList(key);
+ int index = 0;
+ for (String s : list)
+ {
+ try
+ {
+ Phase temp = (Phase) Class.forName(s).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX), temp);
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s), e);
+ }
+ index++;
+ }
+
+ Phase current = null;
+ Phase prev = null;
+ Phase next = null;
+ //Lets build the phase pipe.
+ for (int i = 0; i < phaseMap.size(); i++)
+ {
+ current = phaseMap.get(i);
+ if (i + 1 < phaseMap.size())
+ {
+ next = phaseMap.get(i + 1);
+ }
+ current.init(ctx, next, prev);
+ prev = current;
+ next = null;
+ }
+
+ return current;
}
-
- return current;
- }
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
index fe0231522c..57ce513267 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
@@ -70,7 +70,14 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes
public AMQPApplicationMessage receive()throws QpidException
{
checkClosed();
- return _queue.poll();
+ try
+ {
+ return _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new QpidException("Error occurred while retrieving message",e);
+ }
}
public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
index 91298dcc57..c20a76a247 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
@@ -82,8 +82,8 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes
msgHeaders.getContentType(), //contentType
msgHeaders.getCorrelationId(), //correlationId
msgHeaders.getDeliveryMode(), //deliveryMode non persistant
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ msgHeaders.getDestination(),// destination
+ msgHeaders.getExchange(),// exchange
msgHeaders.getExpiration(), //expiration
msgHeaders.isImmediate(), //immediate
msgHeaders.isMandatory(), //mandatory