diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-07-24 00:35:26 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-07-24 00:35:26 +0000 |
commit | 586e63b99de7711689b0728d7a0c20354256c8dc (patch) | |
tree | 0559ed6348a880f8ce979a951eaff56ecab62d19 | |
parent | 42238d6f0a49bd9311229752c07278329b90e05c (diff) | |
download | qpid-python-586e63b99de7711689b0728d7a0c20354256c8dc.tar.gz |
adding synapse exchange
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@558903 13f79535-47bb-0310-9956-ffa450edef68
18 files changed, 829 insertions, 87 deletions
diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 2cf8a563f0..dbb3ca3e19 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -34,14 +34,89 @@ <properties> <topDirectoryLocation>..</topDirectoryLocation> + <!-- Synapse and related components --> + <synapse.version>1.0</synapse.version> + <stax.api.version>1.0.1</stax.api.version> + <activation.version>1.1</activation.version> + + <!-- Axis2 1.2 and its dependencies --> + <axis2.version>1.2</axis2.version> + <axiom.version>1.2.4</axiom.version> + <xml_schema.version>1.3.1</xml_schema.version> + <xml_apis.version>1.3.03</xml_apis.version> </properties> <dependencies> + + <dependency> + <groupId>org.apache.axis2</groupId> + <artifactId>axis2-kernel</artifactId> + <version>${axis2.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.synapse</groupId> + <artifactId>synapse-core</artifactId> + <version>${synapse.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.synapse</groupId> + <artifactId>synapse-extensions</artifactId> + <version>${synapse.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ws.commons.axiom</groupId> + <artifactId>axiom-api</artifactId> + <version>${axiom.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ws.commons.axiom</groupId> + <artifactId>axiom-impl</artifactId> + <version>${axiom.version}</version> + </dependency> + <!-- <dependency> + <groupId>org.apache.ws.commons.axiom</groupId> + <artifactId>axiom-dom</artifactId> + <version>${axiom.version}</version> + </dependency> + --> + + <dependency> + <groupId>org.apache.ws.commons.schema</groupId> + <artifactId>XmlSchema</artifactId> + <version>${xml_schema.version}</version> + </dependency> + + <dependency> + <groupId>xml-apis</groupId> + <artifactId>xml-apis</artifactId> + <version>${xml_apis.version}</version> + </dependency> + + <dependency> + <groupId>org.codehaus.woodstox</groupId> + <artifactId>wstx-asl</artifactId> + <version>3.2.1</version> + </dependency> + + <dependency> + <groupId>stax</groupId> + <artifactId>stax-api</artifactId> + <version>${stax.api.version}</version> + </dependency> + + <dependency> + <groupId>javax.activation</groupId> + <artifactId>activation</artifactId> + <version>${activation.version}</version> + </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-common</artifactId> - </dependency> + </dependency> <dependency> <groupId>commons-cli</groupId> diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 204b5674ce..f0cf8d37ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -94,7 +94,7 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0); + exchange = _exchangeFactory.createExchange(_exchangeRegistry,new AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0); _exchangeRegistry.registerExchange(exchange); } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 8b4f41a7a0..579ddf64d7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -126,7 +126,7 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract ExchangeMBean createMBean() throws AMQException; - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException + public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException { _virtualHost = host; _name = name; @@ -134,7 +134,10 @@ public abstract class AbstractExchange implements Exchange, Managable _autoDelete = autoDelete; _ticket = ticket; _exchangeMbean = createMBean(); - _exchangeMbean.register(); + if(_exchangeMbean != null) + { + _exchangeMbean.register(); + } } public boolean isDurable() diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 86feb46bb6..db78555b0e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -20,19 +20,15 @@ */ package org.apache.qpid.server.exchange; +import java.util.HashMap; +import java.util.Map; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; - -import java.util.HashMap; -import java.util.Map; -import org.apache.qpid.AMQUnknownExchangeType; -import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchangeFactory implements ExchangeFactory { @@ -48,9 +44,12 @@ public class DefaultExchangeFactory implements ExchangeFactory _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, org.apache.qpid.server.exchange.DestWildExchange.class); _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, org.apache.qpid.server.exchange.HeadersExchange.class); _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, org.apache.qpid.server.exchange.FanoutExchange.class); + + // I'd rather allow an extention mechanism to register custom exchanges. for standard default exchanges this is fine. + _exchangeClassMap.put(new AMQShortString("synapse"), org.apache.qpid.server.exchange.synapse.SynapseExchange.class); } - public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, + public Exchange createExchange(ExchangeRegistry exchangeRegistry,AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException { @@ -62,7 +61,7 @@ public class DefaultExchangeFactory implements ExchangeFactory try { Exchange e = exchClass.newInstance(); - e.initialise(_host, exchange, durable, ticket, autoDelete); + e.initialise(_host, exchange, durable, ticket, autoDelete, exchangeRegistry); return e; } catch (InstantiationException e) diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index c012a1c1c9..084811df09 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -32,7 +32,7 @@ public interface Exchange AMQShortString getName(); AMQShortString getType(); - void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; + void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException; boolean isDurable(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index e07fd0b8fc..7b57a860e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -26,7 +26,7 @@ import org.apache.qpid.framing.AMQShortString; public interface ExchangeFactory { - Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, + Exchange createExchange(ExchangeRegistry exchangeRegistry, AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, int ticket) throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java new file mode 100644 index 0000000000..c7887ed99b --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.exchange.synapse; + +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; + +import javax.activation.DataHandler; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.axiom.attachments.ByteArrayDataSource; +import org.apache.axiom.om.OMDocument; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMText; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.axiom.om.util.StAXUtils; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory; +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.addressing.RelatesTo; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.config.SynapseConfiguration; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; + +/** + * The MessageContext needs to be set up and then is used by the SynapseMessageReceiver to inject messages. + * This class is used by the SynapseMessageReceiver to find the environment. The env is stored in a Parameter to the Axis2 config + */ +public class MessageContextCreatorForQpid{ + + private static Log log = LogFactory.getLog(MessageContextCreatorForQpid.class); + + private static SynapseConfiguration synCfg = null; + private static SynapseEnvironment synEnv = null; + + final static String ORIGINAL_MESSAGE = "ORIGINAL_MESSAGE"; + final static String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE"; + final static String DEFAULT_CHAR_SET_ENCODING = "UTF-8"; + + enum ContentType + { + TEXT_PLAIN ("text/plain"), + TEXT_XML ("text/xml"), + APPLICATION_OCTECT ("application/octet-stream"); + + private final String _value; + + private ContentType (String value) + { + _value = value; + } + + public String value() + { + return _value; + } + } + + private static String createURL(String exchangeName,String routingKey) + { + StringBuffer buf = new StringBuffer(); + buf.append("amqp://"); + buf.append(exchangeName); + buf.append("?"); + buf.append("routingKey="); + buf.append(routingKey); + + return buf.toString(); + } + + public static MessageContext getSynapseMessageContext(AMQMessage amqMsg) throws SynapseException { + + if (synCfg == null || synEnv == null) { + String msg = "Synapse environment has not initialized properly.."; + log.fatal(msg); + throw new SynapseException(msg); + } + + org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext(); + Axis2MessageContext synCtx = new Axis2MessageContext(axis2MC, synCfg, synEnv); + synCtx.setMessageID(amqMsg.getTransferBody().getMessageId().asString()); + if(amqMsg.getTransferBody().getCorrelationId() != null) + { + synCtx.setRelatesTo(new RelatesTo[]{new RelatesTo(amqMsg.getTransferBody().getCorrelationId().asString())}); + } + synCtx.setTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getRoutingKey().asString()))); + + if(amqMsg.getTransferBody().getReplyTo() != null) + { + synCtx.setReplyTo(new EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getReplyTo().asString()))); + } + synCtx.setDoingPOX(true); + synCtx.setProperty(ORIGINAL_MESSAGE, amqMsg); + + //Creating a fictitious SOAP envelope to support the synapse model + + SOAPFactory soapFactory = new SOAP11Factory(); + SOAPEnvelope envelope = soapFactory.getDefaultEnvelope(); + + String contentType = amqMsg.getTransferBody().getContentType().asString(); + if(ContentType.TEXT_PLAIN.value().equals(contentType)) + { + OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null); + OMText textData = soapFactory.createOMText(amqMsg.getTransferBody().getBody().getContentAsString()); + wrapper.addChild(textData); + envelope.getBody().addChild(wrapper); + } + else if (ContentType.TEXT_XML.value().equals(contentType)) + { + XMLStreamReader parser; + try + { + parser = StAXUtils.createXMLStreamReader( + new ByteArrayInputStream(amqMsg.getTransferBody().getBody().getContentAsByteArray()), + DEFAULT_CHAR_SET_ENCODING); + } + catch (XMLStreamException e) + { + throw new SynapseException("Error reading the XML message",e); + } + + StAXOMBuilder builder = new StAXOMBuilder(parser); + //builder.setOMBuilderFactory(soapFactory); + + Object obj = builder.getDocumentElement(); + envelope.getBody().addChild(builder.getDocumentElement()); + } + else if (ContentType.APPLICATION_OCTECT.value().equals(contentType)) + { + // treat binary data as an attachment + DataHandler dataHandler = new DataHandler( + new ByteArrayDataSource(amqMsg.getTransferBody().getBody().getContentAsByteArray())); + OMText textData = soapFactory.createOMText(dataHandler, true); + OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null); + wrapper.addChild(textData); + synCtx.setDoingMTOM(true); + + envelope.getBody().addChild(wrapper); + } + else + { + throw new SynapseException("Unsupported Content Type : " + contentType); + } + + synCtx.setProperty(AMQP_CONTENT_TYPE, contentType); + + try + { + synCtx.setEnvelope(envelope); + } + catch(AxisFault e) + { + throw new SynapseException(e); + } + + return synCtx; + } + + public static AMQMessage getAMQMessage(MessageContext mc) + { + AMQMessage origMsg = (AMQMessage)mc.getProperty(ORIGINAL_MESSAGE); + OMElement payload = mc.getEnvelope().getBody().getFirstElement(); + + String amqContentType = (String)mc.getProperty(AMQP_CONTENT_TYPE); + byte[] content = new byte[0]; + + if(ContentType.TEXT_PLAIN.value().equals(amqContentType)) + { + // For plain text there was a wrapper element + content = payload.getText().getBytes(); + } + else if (ContentType.TEXT_XML.value().equals(amqContentType)) + { + content = payload.getText().getBytes(); + } + else if (ContentType.APPLICATION_OCTECT.value().equals(amqContentType) && mc.isDoingMTOM()) + { + + } + + String url = mc.getTo().getAddress();; + // very crude + // should have utility class to do this, but do it when amqp + // officialy converge on an addressing scheme + String exchangeName = url.substring(7,url.indexOf('?')); + String routingKey = url.substring(url.indexOf('=')+1,url.length()); + + + MessageTransferBody origTransferBody = origMsg.getTransferBody(); + MessageTransferBody transferBody = MessageTransferBody.createMethodBody( + origTransferBody.getMajor(), + origTransferBody.getMinor(), + origTransferBody.getAppId(), //appId + origTransferBody.getApplicationHeaders(), //applicationHeaders + new Content(Content.TypeEnum.INLINE_T, content), //body + origTransferBody.getContentType(), //contentEncoding, + origTransferBody.getContentType(), //contentType + origTransferBody.getCorrelationId(), //correlationId + origTransferBody.getDeliveryMode(), //deliveryMode non persistant + new AMQShortString(exchangeName),// destination + new AMQShortString(exchangeName),// exchange + origTransferBody.getExpiration(), //expiration + origTransferBody.getImmediate(), //immediate + origTransferBody.getMandatory(), //mandatory + origTransferBody.getMessageId(), //messageId + origTransferBody.getPriority(), //priority + origTransferBody.getRedelivered(), //redelivered + origTransferBody.getReplyTo(), //replyTo + new AMQShortString(routingKey), //routingKey, + "abc".getBytes(), //securityToken + origTransferBody.ticket, //ticket + System.currentTimeMillis(), //timestamp + origTransferBody.getTransactionId(), //transactionId + origTransferBody.getTtl(), //ttl, + origTransferBody.getUserId() //userId + ); + AMQMessage newMsg = new AMQMessage(origMsg.getMessageStore(),transferBody,origMsg.getTransactionContext()); + + return newMsg; + } + + public static void setSynConfig(SynapseConfiguration synCfg) { + MessageContextCreatorForQpid.synCfg = synCfg; + } + + public static void setSynEnv(SynapseEnvironment synEnv) { + MessageContextCreatorForQpid.synEnv = synEnv; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java new file mode 100644 index 0000000000..60fdc40788 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java @@ -0,0 +1,74 @@ +package org.apache.qpid.server.exchange.synapse; + +import org.apache.axis2.addressing.EndpointReference; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; +import org.apache.synapse.config.SynapseConfiguration; +import org.apache.synapse.core.SynapseEnvironment; +import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.endpoints.utils.EndpointDefinition; +import org.apache.synapse.statistics.StatisticsCollector; + +public class QpidSynapseEnvironment implements SynapseEnvironment +{ + + private static final Log log = LogFactory.getLog(QpidSynapseEnvironment.class); + + private SynapseConfiguration synapseConfig; + + private StatisticsCollector statisticsCollector; + + private SynapseExchange qpidExchange; + + public QpidSynapseEnvironment(SynapseConfiguration synapseConfig, SynapseExchange qpidExchange) + { + this.synapseConfig = synapseConfig; + this.qpidExchange = qpidExchange; + } + + public MessageContext createMessageContext() + { + org.apache.axis2.context.MessageContext axis2MC = new org.apache.axis2.context.MessageContext(); + MessageContext mc = new Axis2MessageContext(axis2MC, synapseConfig, this); + return mc; + } + + public StatisticsCollector getStatisticsCollector() + { + return statisticsCollector; + } + + public void injectMessage(MessageContext synCtx) + { + + synCtx.getMainSequence().mediate(synCtx); + } + + public void send(EndpointDefinition endpoint, MessageContext smc) + { + if(endpoint != null) + { + smc.setTo(new EndpointReference(endpoint.getAddress())); + AMQMessage newMessage = MessageContextCreatorForQpid.getAMQMessage(smc); + try + { + qpidExchange.getExchangeRegistry().routeContent(newMessage); + } + catch(Exception e) + { + throw new SynapseException("Faulty endpoint",e); + } + } + + } + + public void setStatisticsCollector(StatisticsCollector statisticsCollector) + { + this.statisticsCollector = statisticsCollector; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java new file mode 100644 index 0000000000..c408529fbd --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java @@ -0,0 +1,104 @@ +package org.apache.qpid.server.exchange.synapse; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.synapse.Constants; +import org.apache.synapse.MessageContext; +import org.apache.synapse.config.SynapseConfiguration; +import org.apache.synapse.config.SynapseConfigurationBuilder; +import org.apache.synapse.core.SynapseEnvironment; + +public class SynapseExchange extends AbstractExchange +{ + + public final static AMQShortString TYPE = new AMQShortString("synapse"); + + private SynapseEnvironment synEnv; + + private ExchangeRegistry exchangeRegistry; + + public SynapseExchange() + { + super(); + } + + @Override + public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws AMQException + { + super.initialise(host, name, durable, ticket, autoDelete, exchangeRegistry); + + String config = System.getProperty(Constants.SYNAPSE_XML); + SynapseConfiguration synapseConfiguration = SynapseConfigurationBuilder.getConfiguration(config); + synEnv = new QpidSynapseEnvironment(synapseConfiguration,this); + MessageContextCreatorForQpid.setSynConfig(synapseConfiguration); + MessageContextCreatorForQpid.setSynEnv(synEnv); + this.exchangeRegistry = exchangeRegistry; + } + + @Override + protected ExchangeMBean createMBean() throws AMQException + { + // TODO Auto-generated method stub + return null; + } + + public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + { + throw new UnsupportedOperationException("This exchange does not take bindings"); + } + + public AMQShortString getType() + { + return TYPE; + } + + public boolean hasBindings() throws AMQException + { + return false; + } + + public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException + { + throw new UnsupportedOperationException("This exchange does not take bindings"); + } + + public boolean isBound(AMQShortString routingKey) throws AMQException + { + throw new UnsupportedOperationException("This exchange does not take bindings"); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + throw new UnsupportedOperationException("This exchange does not take bindings"); + } + + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException + { + throw new UnsupportedOperationException("This exchange does not take bindings"); + } + + public void route(AMQMessage message) throws AMQException + { + try + { + MessageContext mc = MessageContextCreatorForQpid.getSynapseMessageContext(message); + synEnv.injectMessage(mc); + } + catch(Exception e) + { + throw new AMQException("Error occurred while trying to mediate message through Synapse",e); + } + } + + public ExchangeRegistry getExchangeRegistry() + { + return exchangeRegistry; + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java new file mode 100644 index 0000000000..09ef377ef7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java @@ -0,0 +1,62 @@ +package org.apache.qpid.server.exchange.synapse; + +import javax.activation.DataHandler; +import javax.xml.namespace.QName; + +import org.apache.axiom.attachments.ByteArrayDataSource; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMText; +import org.apache.axiom.soap.SOAPFactory; +import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory; +import org.apache.synapse.Mediator; +import org.apache.synapse.MessageContext; + +public class TestClassMediator implements Mediator +{ + + public int getTraceState() + { + // TODO Auto-generated method stub + return 0; + } + + public String getType() + { + // TODO Auto-generated method stub + return null; + } + + public boolean mediate(MessageContext mc) + { + SOAPFactory soapFactory = new SOAP11Factory(); + OMElement binaryNode = mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload")); + byte[] source = binaryNode.getText().getBytes(); + + byte[] b = new byte[source.length]; + int j = 0; + for(int i=source.length-1; i>0; i--) + { + b[j] = source[i]; + j++; + } + + mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload")).detach(); + + DataHandler dataHandler = new DataHandler( + new ByteArrayDataSource(b)); + OMText textData = soapFactory.createOMText(dataHandler, true); + OMElement wrapper = soapFactory.createOMElement(new QName("payload"), null); + wrapper.addChild(textData); + mc.setDoingMTOM(true); + + mc.getEnvelope().getBody().addChild(wrapper); + return true; + } + + public void setTraceState(int arg0) + { + // TODO Auto-generated method stub + + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index 7b129f0187..067eef5003 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -79,7 +79,7 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { try { - exchange = exchangeFactory.createExchange(body.exchange, body.type, body.durable, + exchange = exchangeFactory.createExchange(exchangeRegistry,body.exchange, body.type, body.durable, body.passive, body.ticket); exchangeRegistry.registerExchange(exchange); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java index 8b5f05e8ea..df2d840bc9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java @@ -34,12 +34,15 @@ public class ExchangeInitialiser define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); + //There should be an extention mechanism to register + define(registry,factory,new AMQShortString("amq.synapse"),new AMQShortString("synapse")); + registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME)); } private void define(ExchangeRegistry r, ExchangeFactory f, AMQShortString name, AMQShortString type) throws AMQException { - r.registerExchange(f.createExchange(name, type, true, false, 0)); + r.registerExchange(f.createExchange(r,name, type, true, false, 0)); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 711e045516..59f88e2f43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -643,4 +643,13 @@ public class AMQMessage return _requestId; } + public MessageStore getMessageStore() + { + return _store; + } + + public TransactionalContext getTransactionContext() + { + return _txnContext; + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java index bde2a7bfea..9aea71f3bb 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java @@ -27,24 +27,13 @@ public class QpidTestClient QpidExchangeHelper exchangeHelper = session.getExchangeHelper(); exchangeHelper.declareExchange(false, false, QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, QpidConstants.DIRECT_EXCHANGE_CLASS); - QpidQueueHelper queueHelper = session.getQueueHelper(); - queueHelper.declareQueue(false, false, false, false, false, "myQueue"); - queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", "RH"); + exchangeHelper.declareExchange(false, false, QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, QpidConstants.SYNAPSE_EXCHANGE_CLASS); + + //contentBasedRoutingSample(session); - MessageHeaders msgHeaders = new MessageHeaders(); - msgHeaders.setRoutingKey(new AMQShortString("RH")); - msgHeaders.setExchange(new AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME)); - AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,"test".getBytes()); + //transformationSample2(session); - QpidMessageProducer messageProducer = session.createProducer(); - messageProducer.open(); - messageProducer.send(false, true, msg); - - QpidMessageConsumer messageConsumer = session.createConsumer("myQueue", false, false); - messageConsumer.open(); - - AMQPApplicationMessage msg2 = messageConsumer.receive(); - System.out.println(msg.toString()); + binaryMessageTransformations(session); } catch(Exception e) { @@ -53,4 +42,159 @@ public class QpidTestClient } + public static void contentBasedRoutingSample(QpidSession session) throws Exception + { + String tmp = "<m:troubleTicket xmlns:m=\"http://redhat.com/sample\"><m:customerId>532535</m:customerId><m:priority>"; + String tmp2 = "</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>"; + + //Create queues + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "criticalTicketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "criticalTicketQueue", "criticalTicket"); + + queueHelper.declareQueue(false, false, false, false, false, "lowPriorityTicketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "lowPriorityTicketQueue", "lowPriorityTicket"); + + queueHelper.declareQueue(false, false, false, false, false, "ticketQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "ticketQueue", "troubleTicket"); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("defectSystem")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + StringBuffer buf = new StringBuffer(); + buf.append(tmp).append("critical").append(tmp2); + AMQPApplicationMessage criticalMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(criticalMsg.toString()); + messageProducer.send(false, true, criticalMsg); + + buf = new StringBuffer(); + buf.append(tmp).append("low").append(tmp2); + AMQPApplicationMessage lowMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(lowMsg.toString()); + messageProducer.send(false, true, lowMsg); + + buf = new StringBuffer(); + buf.append(tmp).append("high").append(tmp2); + AMQPApplicationMessage highMsg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + System.out.println(highMsg.toString()); + messageProducer.send(false, true, highMsg); + + QpidMessageConsumer messageConsumerCritical = session.createConsumer("criticalTicketQueue", false, false); + messageConsumerCritical.open(); + AMQPApplicationMessage criticalMsgRcv = messageConsumerCritical.receive(); + System.out.println(criticalMsgRcv.toString()); + + QpidMessageConsumer messageConsumerLow = session.createConsumer("lowPriorityTicketQueue", false, false); + messageConsumerLow.open(); + AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive(); + System.out.println(lowMsgRcv.toString()); + + QpidMessageConsumer messageConsumer = session.createConsumer("ticketQueue", false, false); + messageConsumer.open(); + AMQPApplicationMessage msgRcv = messageConsumer.receive(); + System.out.println(msgRcv.toString()); + + } + + public static void transformationSample(QpidSession session) throws Exception + { + String tmp = "<m:quote xmlns:m=\"http://redhat.com/sample\"><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>"; + + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "stockQuoteQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "stockQuoteQueue", "stockQuote"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("stockQuote")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,tmp.getBytes()); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + + public static void transformationSample2(QpidSession session) throws Exception + { + StringBuffer buf = new StringBuffer(); + buf.append("<m:vacationPackage xmlns:m=\"http://redhat.com/sample\">"); + buf.append("<m:customerFirstName>Rajith</m:customerFirstName>"); + buf.append("<m:customerLastName>Rajith</m:customerLastName>"); + buf.append("<m:customerAddress>3349 Missississauga Road,Mississauga,ON, L5L 1J7</m:customerAddress>"); + buf.append("<m:customerDOB>Mississauga</m:customerDOB>"); + buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>"); + buf.append("<m:start>12072007</m:start>"); + buf.append("<m:end>18072007</m:end>"); + buf.append("<m:airTicket>"); + buf.append("<m:airline>AC</m:airline>"); + buf.append("<m:seatPreference>W</m:seatPreference>"); + buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>"); + buf.append("</m:airTicket>"); + buf.append("<m:Hotel>"); + buf.append("<m:noOfDays>5</m:noOfDays>"); + buf.append("<m:rating>5</m:rating>"); + buf.append("<m:meals>AI</m:meals>"); + buf.append("</m:Hotel>"); + buf.append("<m:carRental>"); + buf.append("<m:from>14062007</m:from>"); + buf.append("<m:to>16062007</m:to>"); + buf.append("</m:carRental>"); + buf.append("</m:vacationPackage>"); + + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "carRentalQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "carRentalQueue", "carRental"); + + queueHelper.declareQueue(false, false, false, false, false, "hotelQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "hotelQueue", "hotel"); + + queueHelper.declareQueue(false, false, false, false, false, "airlineQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "airlineQueue", "airline"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("text/xml")); + msgHeaders.setRoutingKey(new AMQShortString("vacationPackage")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf.toString().getBytes()); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + + public static void binaryMessageTransformations(QpidSession session) throws Exception + { + QpidQueueHelper queueHelper = session.getQueueHelper(); + queueHelper.declareQueue(false, false, false, false, false, "binaryQueue"); + queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "binaryQueue", "binary"); + + MessageHeaders msgHeaders = new MessageHeaders(); + msgHeaders.setContentType(new AMQShortString("application/octet-stream")); + msgHeaders.setRoutingKey(new AMQShortString("binary")); + msgHeaders.setDestination(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + msgHeaders.setExchange(new AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME)); + + byte[] buf = new byte[]{72,101,108,108,111}; + + AMQPApplicationMessage msg = new AMQPApplicationMessage(msgHeaders,buf); + + QpidMessageProducer messageProducer = session.createProducer(); + messageProducer.open(); + System.out.println(msg.toString()); + messageProducer.send(false, true, msg); + } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java index 96103509cf..5001d50548 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java @@ -22,6 +22,10 @@ public class QpidConstants public final static String FANOUT_EXCHANGE_CLASS = "fanout"; + public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse"; + + public final static String SYNAPSE_EXCHANGE_CLASS = "synapse"; + public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = "qpid.sysmgmt"; diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java index f14825ed31..5725de9caa 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -8,56 +8,56 @@ import org.apache.qpid.nclient.config.ClientConfiguration; public class PhaseFactory { - /** - * This method will create the pipe and return a reference - * to the top of the pipeline. - * - * The application can then use this (top most) phase and all - * calls will propogated down the pipe. - * - * Simillar calls orginating at the bottom of the pipeline - * will be propogated to the top. - * - * @param ctx - * @return - * @throws AMQPException - */ - public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException - { - String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE; - Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); - List<String> list = ClientConfiguration.get().getList(key); - int index = 0; - for(String s:list) + /** + * This method will create the pipe and return a reference + * to the top of the pipeline. + * + * The application can then use this (top most) phase and all + * calls will propogated down the pipe. + * + * Simillar calls orginating at the bottom of the pipeline + * will be propogated to the top. + * + * @param ctx + * @return + * @throws AMQPException + */ + public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException { - try - { - Phase temp = (Phase)Class.forName(s).newInstance(); - phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX),temp) ; - } - catch(Exception e) - { - throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); - } - index++; - } - - Phase current = null; - Phase prev = null; - Phase next = null; - //Lets build the phase pipe. - for (int i=0; i<phaseMap.size();i++) - { - current = phaseMap.get(i); - if (i+1 < phaseMap.size()) - { - next = phaseMap.get(i+1); - } - current.init(ctx, next, prev); - prev = current; - next = null; + String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE; + Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>(); + List<String> list = ClientConfiguration.get().getList(key); + int index = 0; + for (String s : list) + { + try + { + Phase temp = (Phase) Class.forName(s).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + AMQPConstants.INDEX), temp); + } + catch (Exception e) + { + throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s), e); + } + index++; + } + + Phase current = null; + Phase prev = null; + Phase next = null; + //Lets build the phase pipe. + for (int i = 0; i < phaseMap.size(); i++) + { + current = phaseMap.get(i); + if (i + 1 < phaseMap.size()) + { + next = phaseMap.get(i + 1); + } + current.init(ctx, next, prev); + prev = current; + next = null; + } + + return current; } - - return current; - } }
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java index fe0231522c..57ce513267 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java @@ -70,7 +70,14 @@ public class QpidMessageConsumerImpl extends AbstractResource implements QpidMes public AMQPApplicationMessage receive()throws QpidException { checkClosed(); - return _queue.poll(); + try + { + return _queue.take(); + } + catch (InterruptedException e) + { + throw new QpidException("Error occurred while retrieving message",e); + } } public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws QpidException diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java index 91298dcc57..c20a76a247 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java @@ -82,8 +82,8 @@ public class QpidMessageProducerImpl extends AbstractResource implements QpidMes msgHeaders.getContentType(), //contentType msgHeaders.getCorrelationId(), //correlationId msgHeaders.getDeliveryMode(), //deliveryMode non persistant - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + msgHeaders.getDestination(),// destination + msgHeaders.getExchange(),// exchange msgHeaders.getExpiration(), //expiration msgHeaders.isImmediate(), //immediate msgHeaders.isMandatory(), //mandatory |