summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-07-24 00:35:26 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-07-24 00:35:26 +0000
commit586e63b99de7711689b0728d7a0c20354256c8dc (patch)
tree0559ed6348a880f8ce979a951eaff56ecab62d19
parent42238d6f0a49bd9311229752c07278329b90e05c (diff)
downloadqpid-python-586e63b99de7711689b0728d7a0c20354256c8dc.tar.gz
adding synapse exchange
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@558903 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/pom.xml77
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java19
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java258
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java74
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java104
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java62
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java9
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java176
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java4
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java100
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java9
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java4
18 files changed, 829 insertions, 87 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml
index 2cf8a563f0..dbb3ca3e19 100644
--- a/java/broker/pom.xml
+++ b/java/broker/pom.xml
@@ -34,14 +34,89 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
+ <!-- Synapse and related components -->
+ <synapse.version>1.0</synapse.version>
+ <stax.api.version>1.0.1</stax.api.version>
+ <activation.version>1.1</activation.version>
+
+ <!-- Axis2 1.2 and its dependencies -->
+ <axis2.version>1.2</axis2.version>
+ <axiom.version>1.2.4</axiom.version>
+ <xml_schema.version>1.3.1</xml_schema.version>
+ <xml_apis.version>1.3.03</xml_apis.version>
</properties>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.axis2</groupId>
+ <artifactId>axis2-kernel</artifactId>
+ <version>${axis2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.synapse</groupId>
+ <artifactId>synapse-core</artifactId>
+ <version>${synapse.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.synapse</groupId>
+ <artifactId>synapse-extensions</artifactId>
+ <version>${synapse.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ws.commons.axiom</groupId>
+ <artifactId>axiom-api</artifactId>
+ <version>${axiom.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ws.commons.axiom</groupId>
+ <artifactId>axiom-impl</artifactId>
+ <version>${axiom.version}</version>
+ </dependency>
+ <!-- <dependency>
+ <groupId>org.apache.ws.commons.axiom</groupId>
+ <artifactId>axiom-dom</artifactId>
+ <version>${axiom.version}</version>
+ </dependency>
+ -->
+
+ <dependency>
+ <groupId>org.apache.ws.commons.schema</groupId>
+ <artifactId>XmlSchema</artifactId>
+ <version>${xml_schema.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>xml-apis</groupId>
+ <artifactId>xml-apis</artifactId>
+ <version>${xml_apis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.codehaus.woodstox</groupId>
+ <artifactId>wstx-asl</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ <version>${stax.api.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.activation</groupId>
+ <artifactId>activation</artifactId>
+ <version>${activation.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
- </dependency>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 204b5674ce..f0cf8d37ab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -94,7 +94,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
+ exchange = _exchangeFactory.createExchange(_exchangeRegistry,new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
_exchangeRegistry.registerExchange(exchange);
}
else
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 8b4f41a7a0..579ddf64d7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -126,7 +126,7 @@ public abstract class AbstractExchange implements Exchange, Managable
*/
protected abstract ExchangeMBean createMBean() throws AMQException;
- public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException
{
_virtualHost = host;
_name = name;
@@ -134,7 +134,10 @@ public abstract class AbstractExchange implements Exchange, Managable
_autoDelete = autoDelete;
_ticket = ticket;
_exchangeMbean = createMBean();
- _exchangeMbean.register();
+ if(_exchangeMbean != null)
+ {
+ _exchangeMbean.register();
+ }
}
public boolean isDurable()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 86feb46bb6..db78555b0e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -20,19 +20,15 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchangeFactory implements ExchangeFactory
{
@@ -48,9 +44,12 @@ public class DefaultExchangeFactory implements ExchangeFactory
_exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class);
_exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class);
_exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class);
+
+ // I'd rather allow an extention mechanism to register custom exchanges. for standard default exchanges this is fine.
+ _exchangeClassMap.put(new AMQShortString("synapse"), org.apache.qpid.server.exchange.synapse.SynapseExchange.class);
}
- public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
+ public Exchange createExchange(ExchangeRegistry exchangeRegistry,AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
int ticket)
throws AMQException
{
@@ -62,7 +61,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
try
{
Exchange e = exchClass.newInstance();
- e.initialise(_host, exchange, durable, ticket, autoDelete);
+ e.initialise(_host, exchange, durable, ticket, autoDelete, exchangeRegistry);
return e;
}
catch (InstantiationException e)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index c012a1c1c9..084811df09 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -32,7 +32,7 @@ public interface Exchange
AMQShortString getName();
AMQShortString getType();
- void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException;
boolean isDurable();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index e07fd0b8fc..7b57a860e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString;
public interface ExchangeFactory
{
- Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
+ Exchange createExchange(ExchangeRegistry exchangeRegistry, AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
int ticket)
throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
new file mode 100644
index 0000000000..c7887ed99b
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.exchange.synapse;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMDocument;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+
+/**
+ * The MessageContext needs to be set up and then is used by the SynapseMessageReceiver to inject messages.
+ * This class is used by the SynapseMessageReceiver to find the environment. The env is stored in a Parameter to the Axis2 config
+ */
+public class MessageContextCreatorForQpid{
+
+ private static Log log = LogFactory.getLog(MessageContextCreatorForQpid.class);
+
+ private static SynapseConfiguration synCfg = null;
+ private static SynapseEnvironment synEnv = null;
+
+ final static String ORIGINAL_MESSAGE = "ORIGINAL_MESSAGE";
+ final static String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE";
+ final static String DEFAULT_CHAR_SET_ENCODING = "UTF-8";
+
+ enum ContentType
+ {
+ TEXT_PLAIN ("text/plain"),
+ TEXT_XML ("text/xml"),
+ APPLICATION_OCTECT ("application/octet-stream");
+
+ private final String _value;
+
+ private ContentType (String value)
+ {
+ _value = value;
+ }
+
+ public String value()
+ {
+ return _value;
+ }
+ }
+
+ private static String createURL(String exchangeName,String routingKey)
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("amqp://");
+ buf.append(exchangeName);
+ buf.append("?");
+ buf.append("routingKey=");
+ buf.append(routingKey);
+
+ return buf.toString();
+ }
+
+ public static MessageContext getSynapseMessageContext(AMQMessage amqMsg) throws SynapseException {
+
+ if (synCfg == null || synEnv == null) {
+ String msg = "Synapse environment has not initialized properly..";
+ log.fatal(msg);
+ throw new SynapseException(msg);
+ }
+
+ org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext();
+ Axis2MessageContext synCtx = new Axis2MessageContext(axis2MC, synCfg, synEnv);
+ synCtx.setMessageID(amqMsg.getTransferBody().getMessageId().asString());
+ if(amqMsg.getTransferBody().getCorrelationId() != null)
+ {
+ synCtx.setRelatesTo(new RelatesTo[]{new RelatesTo(amqMsg.getTransferBody().getCorrelationId().asString())});
+ }
+ synCtx.setTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getRoutingKey().asString())));
+
+ if(amqMsg.getTransferBody().getReplyTo() != null)
+ {
+ synCtx.setReplyTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getReplyTo().asString())));
+ }
+ synCtx.setDoingPOX(true);
+ synCtx.setProperty(ORIGINAL_MESSAGE, amqMsg);
+
+ //Creating a fictitious SOAP envelope to support the synapse model
+
+ SOAPFactory soapFactory = new SOAP11Factory();
+ SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+
+ String contentType = amqMsg.getTransferBody().getContentType().asString();
+ if(ContentType.TEXT_PLAIN.value().equals(contentType))
+ {
+ OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+ OMText textData = soapFactory.createOMText(amqMsg.getTransferBody().getBody().getContentAsString());
+ wrapper.addChild(textData);
+ envelope.getBody().addChild(wrapper);
+ }
+ else if (ContentType.TEXT_XML.value().equals(contentType))
+ {
+ XMLStreamReader parser;
+ try
+ {
+ parser = StAXUtils.createXMLStreamReader(
+ new ByteArrayInputStream(amqMsg.getTransferBody().getBody().getContentAsByteArray()),
+ DEFAULT_CHAR_SET_ENCODING);
+ }
+ catch (XMLStreamException e)
+ {
+ throw new SynapseException("Error reading the XML message",e);
+ }
+
+ StAXOMBuilder builder = new StAXOMBuilder(parser);
+ //builder.setOMBuilderFactory(soapFactory);
+
+ Object obj = builder.getDocumentElement();
+ envelope.getBody().addChild(builder.getDocumentElement());
+ }
+ else if (ContentType.APPLICATION_OCTECT.value().equals(contentType))
+ {
+ // treat binary data as an attachment
+ DataHandler dataHandler = new DataHandler(
+ new ByteArrayDataSource(amqMsg.getTransferBody().getBody().getContentAsByteArray()));
+ OMText textData = soapFactory.createOMText(dataHandler, true);
+ OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+ wrapper.addChild(textData);
+ synCtx.setDoingMTOM(true);
+
+ envelope.getBody().addChild(wrapper);
+ }
+ else
+ {
+ throw new SynapseException("Unsupported Content Type : " + contentType);
+ }
+
+ synCtx.setProperty(AMQP_CONTENT_TYPE, contentType);
+
+ try
+ {
+ synCtx.setEnvelope(envelope);
+ }
+ catch(AxisFault e)
+ {
+ throw new SynapseException(e);
+ }
+
+ return synCtx;
+ }
+
+ public static AMQMessage getAMQMessage(MessageContext mc)
+ {
+ AMQMessage origMsg = (AMQMessage)mc.getProperty(ORIGINAL_MESSAGE);
+ OMElement payload = mc.getEnvelope().getBody().getFirstElement();
+
+ String amqContentType = (String)mc.getProperty(AMQP_CONTENT_TYPE);
+ byte[] content = new byte[0];
+
+ if(ContentType.TEXT_PLAIN.value().equals(amqContentType))
+ {
+ // For plain text there was a wrapper element
+ content = payload.getText().getBytes();
+ }
+ else if (ContentType.TEXT_XML.value().equals(amqContentType))
+ {
+ content = payload.getText().getBytes();
+ }
+ else if (ContentType.APPLICATION_OCTECT.value().equals(amqContentType) && mc.isDoingMTOM())
+ {
+
+ }
+
+ String url = mc.getTo().getAddress();;
+ // very crude
+ // should have utility class to do this, but do it when amqp
+ // officialy converge on an addressing scheme
+ String exchangeName = url.substring(7,url.indexOf('?'));
+ String routingKey = url.substring(url.indexOf('=')+1,url.length());
+
+
+ MessageTransferBody origTransferBody = origMsg.getTransferBody();
+ MessageTransferBody transferBody = MessageTransferBody.createMethodBody(
+ origTransferBody.getMajor(),
+ origTransferBody.getMinor(),
+ origTransferBody.getAppId(), //appId
+ origTransferBody.getApplicationHeaders(), //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T, content), //body
+ origTransferBody.getContentType(), //contentEncoding,
+ origTransferBody.getContentType(), //contentType
+ origTransferBody.getCorrelationId(), //correlationId
+ origTransferBody.getDeliveryMode(), //deliveryMode non persistant
+ new AMQShortString(exchangeName),// destination
+ new AMQShortString(exchangeName),// exchange
+ origTransferBody.getExpiration(), //expiration
+ origTransferBody.getImmediate(), //immediate
+ origTransferBody.getMandatory(), //mandatory
+ origTransferBody.getMessageId(), //messageId
+ origTransferBody.getPriority(), //priority
+ origTransferBody.getRedelivered(), //redelivered
+ origTransferBody.getReplyTo(), //replyTo
+ new AMQShortString(routingKey), //routingKey,
+ "abc".getBytes(), //securityToken
+ origTransferBody.ticket, //ticket
+ System.currentTimeMillis(), //timestamp
+ origTransferBody.getTransactionId(), //transactionId
+ origTransferBody.getTtl(), //ttl,
+ origTransferBody.getUserId() //userId
+ );
+ AMQMessage newMsg = new AMQMessage(origMsg.getMessageStore(),transferBody,origMsg.getTransactionContext());
+
+ return newMsg;
+ }
+
+ public static void setSynConfig(SynapseConfiguration synCfg) {
+ MessageContextCreatorForQpid.synCfg = synCfg;
+ }
+
+ public static void setSynEnv(SynapseEnvironment synEnv) {
+ MessageContextCreatorForQpid.synEnv = synEnv;
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
new file mode 100644
index 0000000000..60fdc40788
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
@@ -0,0 +1,74 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.utils.EndpointDefinition;
+import org.apache.synapse.statistics.StatisticsCollector;
+
+public class QpidSynapseEnvironment implements SynapseEnvironment
+{
+
+ private static final Log log = LogFactory.getLog(QpidSynapseEnvironment.class);
+
+ private SynapseConfiguration synapseConfig;
+
+ private StatisticsCollector statisticsCollector;
+
+ private SynapseExchange qpidExchange;
+
+ public QpidSynapseEnvironment(SynapseConfiguration synapseConfig, SynapseExchange qpidExchange)
+ {
+ this.synapseConfig = synapseConfig;
+ this.qpidExchange = qpidExchange;
+ }
+
+ public MessageContext createMessageContext()
+ {
+ org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext();
+ MessageContext mc = new Axis2MessageContext(axis2MC, synapseConfig, this);
+ return mc;
+ }
+
+ public StatisticsCollector getStatisticsCollector()
+ {
+ return statisticsCollector;
+ }
+
+ public void injectMessage(MessageContext synCtx)
+ {
+
+ synCtx.getMainSequence().mediate(synCtx);
+ }
+
+ public void send(EndpointDefinition endpoint, MessageContext smc)
+ {
+ if(endpoint != null)
+ {
+ smc.setTo(new EndpointReference(endpoint.getAddress()));
+ AMQMessage newMessage = MessageContextCreatorForQpid.getAMQMessage(smc);
+ try
+ {
+ qpidExchange.getExchangeRegistry().routeContent(newMessage);
+ }
+ catch(Exception e)
+ {
+ throw new SynapseException("Faulty endpoint",e);
+ }
+ }
+
+ }
+
+ public void setStatisticsCollector(StatisticsCollector statisticsCollector)
+ {
+ this.statisticsCollector = statisticsCollector;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
new file mode 100644
index 0000000000..c408529fbd
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
@@ -0,0 +1,104 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.synapse.Constants;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.config.SynapseConfigurationBuilder;
+import org.apache.synapse.core.SynapseEnvironment;
+
+public class SynapseExchange extends AbstractExchange
+{
+
+ public final static AMQShortString TYPE = new AMQShortString("synapse");
+
+ private SynapseEnvironment synEnv;
+
+ private ExchangeRegistry exchangeRegistry;
+
+ public SynapseExchange()
+ {
+ super();
+ }
+
+ @Override
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException
+ {
+ super.initialise(host, name, durable, ticket, autoDelete, exchangeRegistry);
+
+ String config = System.getProperty(Constants.SYNAPSE_XML);
+ SynapseConfiguration synapseConfiguration = SynapseConfigurationBuilder.getConfiguration(config);
+ synEnv = new QpidSynapseEnvironment(synapseConfiguration,this);
+ MessageContextCreatorForQpid.setSynConfig(synapseConfiguration);
+ MessageContextCreatorForQpid.setSynEnv(synEnv);
+ this.exchangeRegistry = exchangeRegistry;
+ }
+
+ @Override
+ protected ExchangeMBean createMBean() throws AMQException
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ {
+ throw new UnsupportedOperationException("This exchange does not take bindings");
+ }
+
+ public AMQShortString getType()
+ {
+ return TYPE;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return false;
+ }
+
+ public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
+ {
+ throw new UnsupportedOperationException("This exchange does not take bindings");
+ }
+
+ public boolean isBound(AMQShortString routingKey) throws AMQException
+ {
+ throw new UnsupportedOperationException("This exchange does not take bindings");
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ throw new UnsupportedOperationException("This exchange does not take bindings");
+ }
+
+ public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ throw new UnsupportedOperationException("This exchange does not take bindings");
+ }
+
+ public void route(AMQMessage message) throws AMQException
+ {
+ try
+ {
+ MessageContext mc = MessageContextCreatorForQpid.getSynapseMessageContext(message);
+ synEnv.injectMessage(mc);
+ }
+ catch(Exception e)
+ {
+ throw new AMQException("Error occurred while trying to mediate message through Synapse",e);
+ }
+ }
+
+ public ExchangeRegistry getExchangeRegistry()
+ {
+ return exchangeRegistry;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
new file mode 100644
index 0000000000..09ef377ef7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+public class TestClassMediator implements Mediator
+{
+
+ public int getTraceState()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public String getType()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public boolean mediate(MessageContext mc)
+ {
+ SOAPFactory soapFactory = new SOAP11Factory();
+ OMElement binaryNode = mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload"));
+ byte[] source = binaryNode.getText().getBytes();
+
+ byte[] b = new byte[source.length];
+ int j = 0;
+ for(int i=source.length-1; i>0; i--)
+ {
+ b[j] = source[i];
+ j++;
+ }
+
+ mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload")).detach();
+
+ DataHandler dataHandler = new DataHandler(
+ new ByteArrayDataSource(b));
+ OMText textData = soapFactory.createOMText(dataHandler, true);
+ OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null);
+ wrapper.addChild(textData);
+ mc.setDoingMTOM(true);
+
+ mc.getEnvelope().getBody().addChild(wrapper);
+ return true;
+ }
+
+ public void setTraceState(int arg0)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
index 7b129f0187..067eef5003 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
@@ -79,7 +79,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
try
{
- exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+ exchange = exchangeFactory.createExchange(exchangeRegistry,body.exchange, body.type, body.durable,
body.passive, body.ticket);
exchangeRegistry.registerExchange(exchange);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
index 8b5f05e8ea..df2d840bc9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
@@ -34,12 +34,15 @@ public class ExchangeInitialiser
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+ //There should be an extention mechanism to register
+ define(registry,factory,new AMQShortString("amq.synapse"),new AMQShortString("synapse"));
+
registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
AMQShortString name, AMQShortString type) throws AMQException
{
- r.registerExchange(f.createExchange(name, type, true, false, 0));
+ r.registerExchange(f.createExchange(r,name, type, true, false, 0));
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 711e045516..59f88e2f43 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -643,4 +643,13 @@ public class AMQMessage
return _requestId;
}
+ public MessageStore getMessageStore()
+ {
+ return _store;
+ }
+
+ public TransactionalContext getTransactionContext()
+ {
+ return _txnContext;
+ }
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
index bde2a7bfea..9aea71f3bb 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
@@ -27,24 +27,13 @@ public class QpidTestClient
QpidExchangeHelper exchangeHelper = session.getExchangeHelper();
exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS);
- QpidQueueHelper queueHelper = session.getQueueHelper();
- queueHelper.declareQueue(false, false, false, false, false, "myQueue");
- queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH");
+ exchangeHelper.declareExchange(false, false, QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, QpidConstants.SYNAPSE_EXCHANGE_CLASS);
+
+ //contentBasedRoutingSample(session);
- MessageHeaders msgHeaders = new MessageHeaders();
- msgHeaders.setRoutingKey(new AMQShortString("RH"));
- msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
- AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes());
+ //transformationSample2(session);
- QpidMessageProducer messageProducer = session.createProducer();
- messageProducer.open();
- messageProducer.send(false, true, msg);
-
- QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false);
- messageConsumer.open();
-
- AMQPApplicationMessage msg2 = messageConsumer.receive();
- System.out.println(msg.toString());
+ binaryMessageTransformations(session);
}
catch(Exception e)
{
@@ -53,4 +42,159 @@ public class QpidTestClient
}
+ public static void contentBasedRoutingSample(QpidSession session) throws Exception
+ {
+ String tmp = "<m:troubleTicket xmlns:m=\"http://redhat.com/sample\"><m:customerId>532535</m:customerId><m:priority>";
+ String tmp2 = "</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>";
+
+ //Create queues
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "criticalTicketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "criticalTicketQueue", "criticalTicket");
+
+ queueHelper.declareQueue(false, false, false, false, false, "lowPriorityTicketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "lowPriorityTicketQueue", "lowPriorityTicket");
+
+ queueHelper.declareQueue(false, false, false, false, false, "ticketQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "ticketQueue", "troubleTicket");
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("defectSystem"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ StringBuffer buf = new StringBuffer();
+ buf.append(tmp).append("critical").append(tmp2);
+ AMQPApplicationMessage criticalMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(criticalMsg.toString());
+ messageProducer.send(false, true, criticalMsg);
+
+ buf = new StringBuffer();
+ buf.append(tmp).append("low").append(tmp2);
+ AMQPApplicationMessage lowMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(lowMsg.toString());
+ messageProducer.send(false, true, lowMsg);
+
+ buf = new StringBuffer();
+ buf.append(tmp).append("high").append(tmp2);
+ AMQPApplicationMessage highMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+ System.out.println(highMsg.toString());
+ messageProducer.send(false, true, highMsg);
+
+ QpidMessageConsumer messageConsumerCritical = session.createConsumer("criticalTicketQueue", false, false);
+ messageConsumerCritical.open();
+ AMQPApplicationMessage criticalMsgRcv = messageConsumerCritical.receive();
+ System.out.println(criticalMsgRcv.toString());
+
+ QpidMessageConsumer messageConsumerLow = session.createConsumer("lowPriorityTicketQueue", false, false);
+ messageConsumerLow.open();
+ AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive();
+ System.out.println(lowMsgRcv.toString());
+
+ QpidMessageConsumer messageConsumer = session.createConsumer("ticketQueue", false, false);
+ messageConsumer.open();
+ AMQPApplicationMessage msgRcv = messageConsumer.receive();
+ System.out.println(msgRcv.toString());
+
+ }
+
+ public static void transformationSample(QpidSession session) throws Exception
+ {
+ String tmp = "<m:quote xmlns:m=\"http://redhat.com/sample\"><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>";
+
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "stockQuoteQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "stockQuoteQueue", "stockQuote");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("stockQuote"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,tmp.getBytes());
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
+ public static void transformationSample2(QpidSession session) throws Exception
+ {
+ StringBuffer buf = new StringBuffer();
+ buf.append("<m:vacationPackage xmlns:m=\"http://redhat.com/sample\">");
+ buf.append("<m:customerFirstName>Rajith</m:customerFirstName>");
+ buf.append("<m:customerLastName>Rajith</m:customerLastName>");
+ buf.append("<m:customerAddress>3349 Missississauga Road,Mississauga,ON, L5L 1J7</m:customerAddress>");
+ buf.append("<m:customerDOB>Mississauga</m:customerDOB>");
+ buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>");
+ buf.append("<m:start>12072007</m:start>");
+ buf.append("<m:end>18072007</m:end>");
+ buf.append("<m:airTicket>");
+ buf.append("<m:airline>AC</m:airline>");
+ buf.append("<m:seatPreference>W</m:seatPreference>");
+ buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>");
+ buf.append("</m:airTicket>");
+ buf.append("<m:Hotel>");
+ buf.append("<m:noOfDays>5</m:noOfDays>");
+ buf.append("<m:rating>5</m:rating>");
+ buf.append("<m:meals>AI</m:meals>");
+ buf.append("</m:Hotel>");
+ buf.append("<m:carRental>");
+ buf.append("<m:from>14062007</m:from>");
+ buf.append("<m:to>16062007</m:to>");
+ buf.append("</m:carRental>");
+ buf.append("</m:vacationPackage>");
+
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "carRentalQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "carRentalQueue", "carRental");
+
+ queueHelper.declareQueue(false, false, false, false, false, "hotelQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "hotelQueue", "hotel");
+
+ queueHelper.declareQueue(false, false, false, false, false, "airlineQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "airlineQueue", "airline");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("text/xml"));
+ msgHeaders.setRoutingKey(new AMQShortString("vacationPackage"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
+ public static void binaryMessageTransformations(QpidSession session) throws Exception
+ {
+ QpidQueueHelper queueHelper = session.getQueueHelper();
+ queueHelper.declareQueue(false, false, false, false, false, "binaryQueue");
+ queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "binaryQueue", "binary");
+
+ MessageHeaders msgHeaders = new MessageHeaders();
+ msgHeaders.setContentType(new AMQShortString("application/octet-stream"));
+ msgHeaders.setRoutingKey(new AMQShortString("binary"));
+ msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+ msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+
+ byte[] buf = new byte[]{72,101,108,108,111};
+
+ AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf);
+
+ QpidMessageProducer messageProducer = session.createProducer();
+ messageProducer.open();
+ System.out.println(msg.toString());
+ messageProducer.send(false, true, msg);
+ }
+
}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
index 96103509cf..5001d50548 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
@@ -22,6 +22,10 @@ public class QpidConstants
public final static String FANOUT_EXCHANGE_CLASS = "fanout";
+ public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse";
+
+ public final static String SYNAPSE_EXCHANGE_CLASS = "synapse";
+
public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt";
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
index f14825ed31..5725de9caa 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -8,56 +8,56 @@ import org.apache.qpid.nclient.config.ClientConfiguration;
public class PhaseFactory
{
- /**
- * This method will create the pipe and return a reference
- * to the top of the pipeline.
- *
- * The application can then use this (top most) phase and all
- * calls will propogated down the pipe.
- *
- * Simillar calls orginating at the bottom of the pipeline
- * will be propogated to the top.
- *
- * @param ctx
- * @return
- * @throws AMQPException
- */
- public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
- {
- String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
- Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
- List<String> list = ClientConfiguration.get().getList(key);
- int index = 0;
- for(String s:list)
+ /**
+ * This method will create the pipe and return a reference
+ * to the top of the pipeline.
+ *
+ * The application can then use this (top most) phase and all
+ * calls will propogated down the pipe.
+ *
+ * Simillar calls orginating at the bottom of the pipeline
+ * will be propogated to the top.
+ *
+ * @param ctx
+ * @return
+ * @throws AMQPException
+ */
+ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
{
- try
- {
- Phase temp = (Phase)Class.forName(s).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ;
- }
- catch(Exception e)
- {
- throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
- }
- index++;
- }
-
- Phase current = null;
- Phase prev = null;
- Phase next = null;
- //Lets build the phase pipe.
- for (int i=0; i<phaseMap.size();i++)
- {
- current = phaseMap.get(i);
- if (i+1 < phaseMap.size())
- {
- next = phaseMap.get(i+1);
- }
- current.init(ctx, next, prev);
- prev = current;
- next = null;
+ String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
+ Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>();
+ List<String> list = ClientConfiguration.get().getList(key);
+ int index = 0;
+ for (String s : list)
+ {
+ try
+ {
+ Phase temp = (Phase) Class.forName(s).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX), temp);
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s), e);
+ }
+ index++;
+ }
+
+ Phase current = null;
+ Phase prev = null;
+ Phase next = null;
+ //Lets build the phase pipe.
+ for (int i = 0; i < phaseMap.size(); i++)
+ {
+ current = phaseMap.get(i);
+ if (i + 1 < phaseMap.size())
+ {
+ next = phaseMap.get(i + 1);
+ }
+ current.init(ctx, next, prev);
+ prev = current;
+ next = null;
+ }
+
+ return current;
}
-
- return current;
- }
} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
index fe0231522c..57ce513267 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
@@ -70,7 +70,14 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes
public AMQPApplicationMessage receive()throws QpidException
{
checkClosed();
- return _queue.poll();
+ try
+ {
+ return _queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new QpidException("Error occurred while retrieving message",e);
+ }
}
public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
index 91298dcc57..c20a76a247 100644
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
@@ -82,8 +82,8 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes
msgHeaders.getContentType(), //contentType
msgHeaders.getCorrelationId(), //correlationId
msgHeaders.getDeliveryMode(), //deliveryMode non persistant
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ msgHeaders.getDestination(),// destination
+ msgHeaders.getExchange(),// exchange
msgHeaders.getExpiration(), //expiration
msgHeaders.isImmediate(), //immediate
msgHeaders.isMandatory(), //mandatory