diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-04-04 11:45:45 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-04-04 11:45:45 +0000 |
| commit | 3bd6508fed99480106ef01ee348006954092bd6f (patch) | |
| tree | 285f29849b65b848ee972ae695f3713570be22b6 /qpid | |
| parent | 0ad68d54aaf249b1f0fb8e712461bb8af6841321 (diff) | |
| download | qpid-python-3bd6508fed99480106ef01ee348006954092bd6f.tar.gz | |
QPID-3401 Added code to extract information from an address string and
populate Node and Link data structures.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1309341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
7 files changed, 209 insertions, 55 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/DestinationStringParser.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/DestinationStringParser.java index c4c6f1f7e3..3b5bd74f76 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/DestinationStringParser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/DestinationStringParser.java @@ -23,29 +23,67 @@ package org.apache.qpid.jms; import java.net.URISyntaxException; import java.util.Collections; -import org.apache.qpid.jms.QpidDestination.Type; +import org.apache.qpid.jms.QpidDestination.DestinationType; import org.apache.qpid.messaging.Address; import org.apache.qpid.messaging.address.AddressException; +import org.apache.qpid.messaging.address.Link; +import org.apache.qpid.messaging.address.Link.Reliability; import org.apache.qpid.messaging.address.Node; import org.apache.qpid.messaging.address.Node.AddressPolicy; +import org.apache.qpid.messaging.address.Node.NodeType; import org.apache.qpid.messaging.util.AddressHelper; import org.apache.qpid.url.AMQBindingURL; public class DestinationStringParser { - public static Address parseAddressString(String str, Type type) throws AddressException + public static Address parseAddressString(String str, DestinationType type) throws AddressException { Address addr = Address.parse(str); AddressHelper helper = new AddressHelper(addr); + Node node = new Node(); + node.setName(addr.getName()); node.setAssertPolicy(AddressPolicy.getAddressPolicy(helper.getAssert())); node.setCreatePolicy(AddressPolicy.getAddressPolicy(helper.getCreate())); node.setDeletePolicy(AddressPolicy.getAddressPolicy(helper.getDelete())); + node.setDurable(helper.isNodeDurable()); + + if (DestinationType.TOPIC == type) + { + if (helper.getNodeType() == NodeType.QUEUE) + { + throw new AddressException("Destination is marked as a Topic, but address is defined as a Queue"); + } + node.setType(NodeType.TOPIC); + } + else + { + if (helper.getNodeType() == NodeType.TOPIC) + { + throw new AddressException("Destination is marked as a Queue, but address is defined as a Topic"); + } + node.setType(NodeType.QUEUE); + } + + node.setDeclareProps(helper.getNodeDeclareArgs()); + node.setBindingProps(helper.getNodeBindings()); + addr.setNode(node); + + Link link = new Link(); + link.setName(helper.getLinkName()); + link.setDurable(helper.isLinkDurable()); + link.setReliability(Reliability.getReliability(helper.getLinkReliability())); + link.setProducerCapacity(helper.getProducerCapacity()); + link.setConsumerCapacity(helper.getConsumeCapacity()); + link.setDeclareProps(helper.getLinkDeclareArgs()); + link.setBindingProps(helper.getLinkBindings()); + link.setSubscribeProps(helper.getLinkSubscribeArgs()); + addr.setLink(link); return addr; } - public static Address parseBURLString(String str, Type type) throws AddressException + public static Address parseBURLString(String str, DestinationType type) throws AddressException { AMQBindingURL burl; try @@ -60,7 +98,7 @@ public class DestinationStringParser } Address addr; - if (type == Type.TOPIC) + if (type == DestinationType.TOPIC) { addr = new Address(burl.getExchangeName().asString(), burl.getRoutingKey().asString(), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidDestination.java index 5166cce59a..a488679379 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidDestination.java @@ -36,17 +36,17 @@ import org.slf4j.LoggerFactory; public abstract class QpidDestination implements Destination, Referenceable { - public enum Type {QUEUE, TOPIC}; + public enum DestinationType {QUEUE, TOPIC}; private static final Logger _logger = LoggerFactory.getLogger(QpidDestination.class); private static final DestSyntax defaultDestSyntax; private DestSyntax _destSyntax = DestSyntax.ADDR; - protected final Type type; + protected final DestinationType type; protected String destinationString; protected Address address; - protected QpidDestination(Type type) + protected QpidDestination(DestinationType type) { this.type = type; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidQueue.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidQueue.java index 18dafffd37..a1183240d6 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidQueue.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidQueue.java @@ -27,12 +27,12 @@ public class QpidQueue extends QpidDestination implements Queue { public QpidQueue() { - super(Type.QUEUE); + super(DestinationType.QUEUE); } public QpidQueue(String str) throws JMSException { - super(Type.QUEUE); + super(DestinationType.QUEUE); setDestinationString(str); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidTopic.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidTopic.java index 6d34303fa2..f41897aa00 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidTopic.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/QpidTopic.java @@ -23,18 +23,18 @@ package org.apache.qpid.jms; import javax.jms.JMSException; import javax.jms.Topic; -import org.apache.qpid.jms.QpidDestination.Type; +import org.apache.qpid.jms.QpidDestination.DestinationType; public class QpidTopic extends QpidDestination implements Topic { public QpidTopic() { - super(Type.TOPIC); + super(DestinationType.TOPIC); } public QpidTopic(String str) throws JMSException { - super(Type.TOPIC); + super(DestinationType.TOPIC); setDestinationString(str); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Link.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Link.java index 0e01b010bb..1a6775df57 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Link.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Link.java @@ -23,6 +23,7 @@ package org.apache.qpid.messaging.address; import static org.apache.qpid.messaging.address.Link.Reliability.AT_LEAST_ONCE; import java.util.Collections; +import java.util.List; import java.util.Map; public class Link @@ -42,13 +43,16 @@ public class Link if (reliability == null) { return AT_LEAST_ONCE; - } else if (reliability.equalsIgnoreCase("unreliable")) + } + else if (reliability.equalsIgnoreCase("unreliable")) { return UNRELIABLE; - } else if (reliability.equalsIgnoreCase("at-least-once")) + } + else if (reliability.equalsIgnoreCase("at-least-once")) { return AT_LEAST_ONCE; - } else + } + else { throw new AddressException("The reliability mode '" + reliability + "' is not yet supported"); @@ -65,9 +69,9 @@ public class Link protected int producerCapacity = 0; protected Reliability reliability = AT_LEAST_ONCE; - protected Map<String, Object> xDeclareProps = (Map<String, Object>) Collections.EMPTY_MAP; - protected Map<String, Object> xBindingProps = (Map<String, Object>) Collections.EMPTY_MAP; - protected Map<String, Object> xSubscribeProps = (Map<String, Object>) Collections.EMPTY_MAP; + protected Map<String, Object> xDeclareProps = Collections.emptyMap(); + protected List<Object> xBindingProps = Collections.emptyList(); + protected Map<String, Object> xSubscribeProps = Collections.emptyMap(); public Reliability getReliability() { @@ -109,17 +113,17 @@ public class Link return name; } - public Map<String, Object> getXDeclareProperties() + public Map<String, Object> getDeclareProperties() { return xDeclareProps; } - public Map<String, Object> getXBindingProperties() + public List<Object> getBindingProperties() { return xBindingProps; } - public Map<String, Object> getXSubscribeProperties() + public Map<String, Object> getSubscribeProperties() { return xSubscribeProps; } @@ -164,17 +168,17 @@ public class Link this.reliability = reliability; } - public void setxDeclareProps(Map<String, Object> xDeclareProps) + public void setDeclareProps(Map<String, Object> xDeclareProps) { this.xDeclareProps = xDeclareProps; } - public void setxBindingProps(Map<String, Object> xBindingProps) + public void setBindingProps(List<Object> xBindingProps) { this.xBindingProps = xBindingProps; } - public void setxSubscribeProps(Map<String, Object> xSubscribeProps) + public void setSubscribeProps(Map<String, Object> xSubscribeProps) { this.xSubscribeProps = xSubscribeProps; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Node.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Node.java index e410f68445..dbda939307 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Node.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/address/Node.java @@ -21,6 +21,7 @@ package org.apache.qpid.messaging.address; import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.qpid.messaging.address.Link.Reliability; @@ -60,13 +61,13 @@ public class Node public enum NodeType { - QUEUE, TOPIC, UNRESOLVED; + QUEUE, TOPIC; public static NodeType getNodeType(String type) throws AddressException { if (type == null) { - return UNRESOLVED; + return QUEUE; // defaults to queue } else if (type.equalsIgnoreCase("queue")) { @@ -85,14 +86,14 @@ public class Node protected String name; protected boolean durable = false; - protected NodeType type = NodeType.UNRESOLVED; + protected NodeType type = NodeType.QUEUE; protected AddressPolicy createPolicy = AddressPolicy.NEVER; protected AddressPolicy assertPolicy = AddressPolicy.NEVER; protected AddressPolicy deletePolicy = AddressPolicy.NEVER; - protected Map<String, Object> xDeclareProps = (Map<String, Object>) Collections.EMPTY_MAP; - protected Map<String, Object> xBindingProps = (Map<String, Object>) Collections.EMPTY_MAP; + protected Map<String, Object> xDeclareProps = Collections.emptyMap(); + protected List<Object> xBindingProps = Collections.emptyList(); public String getName() { @@ -124,12 +125,12 @@ public class Node return deletePolicy; } - public Map<String, Object> getXDeclareProperties() + public Map<String, Object> getDeclareProperties() { return xDeclareProps; } - public Map<String, Object> getXBindingProperties() + public List<Object> getBindingProperties() { return xBindingProps; } @@ -164,12 +165,12 @@ public class Node this.deletePolicy = deletePolicy; } - public void setxDeclareProps(Map<String, Object> xDeclareProps) + public void setDeclareProps(Map<String, Object> xDeclareProps) { this.xDeclareProps = xDeclareProps; } - public void setxBindingProps(Map<String, Object> xBindingProps) + public void setBindingProps(List<Object> xBindingProps) { this.xBindingProps = xBindingProps; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressHelper.java b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressHelper.java index 05cab69118..736b4f7376 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressHelper.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/messaging/util/AddressHelper.java @@ -26,7 +26,12 @@ import java.util.Map; import org.apache.qpid.configuration.Accessor; import org.apache.qpid.configuration.Accessor.MapAccessor; +import org.apache.qpid.configuration.Accessor.NestedMapAccessor; import org.apache.qpid.messaging.Address; +import org.apache.qpid.messaging.address.Node.NodeType; +import org.apache.qpid.messaging.address.AddressException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utility class for extracting information from the address class @@ -62,21 +67,23 @@ public class AddressHelper public static final String RELIABILITY = "reliability"; private Address address; - private MapAccessor addressProps; - private MapAccessor nodeProps; - private MapAccessor linkProps; + private NestedMapAccessor addressProps; + private NestedMapAccessor nodeProps; + private NestedMapAccessor linkProps; + + private static final Logger _logger = LoggerFactory.getLogger(AddressHelper.class); public AddressHelper(Address address) { this.address = address; - addressProps = new MapAccessor(address.getOptions()); + addressProps = new NestedMapAccessor(address.getOptions()); Map node_props = address.getOptions() == null || address.getOptions().get(NODE) == null ? null : (Map) address.getOptions().get(NODE); if (node_props != null) { - nodeProps = new MapAccessor(node_props); + nodeProps = new NestedMapAccessor(node_props); } Map link_props = address.getOptions() == null @@ -85,7 +92,7 @@ public class AddressHelper if (link_props != null) { - linkProps = new MapAccessor(link_props); + linkProps = new NestedMapAccessor(link_props); } } @@ -104,40 +111,144 @@ public class AddressHelper return addressProps.getString(DELETE); } - public boolean isNodeMarkedNoLocal() - { - Boolean b = nodeProps.getBoolean(NO_LOCAL); - return b == null ? false : b; - } - public boolean isBrowseOnly() { String mode = addressProps.getString(MODE); return mode != null && mode.equals(BROWSE) ? true : false; } - @SuppressWarnings("unchecked") + public boolean isNodeDurable() + { + return getDurability(nodeProps); + } + + public boolean isLinkDurable() + { + return getDurability(linkProps); + } + + private boolean getDurability(NestedMapAccessor map) + { + Boolean result = map.getBoolean(DURABLE); + return (result == null) ? false : result.booleanValue(); + } + + public NodeType getNodeType() throws AddressException + { + return NodeType.getNodeType(nodeProps.getString(TYPE)); + } + public List<Object> getNodeBindings() { - return (List<Object>) nodeProps.getList(X_BINDINGS); + return getBindigs(nodeProps); } - public Map getDeclareArgs(Map props) + public List<Object> getLinkBindings() { - if (props != null && props.get(X_DECLARE) != null) + return getBindigs(linkProps); + } + + private List<Object> getBindigs(NestedMapAccessor map) + { + List<Object> bindings = (List<Object>) map.getList(X_BINDINGS); + if (bindings == null) { - return (Map) props.get(X_DECLARE); + return Collections.emptyList(); } else { - return Collections.EMPTY_MAP; + return bindings; } } - private boolean getDurability(Map map) + public Map<String,Object> getNodeDeclareArgs() { - Accessor access = new MapAccessor(map); - Boolean result = access.getBoolean(DURABLE); - return (result == null) ? false : result.booleanValue(); + return getDeclareArgs(nodeProps); + } + + public Map<String,Object> getLinkDeclareArgs() + { + return getDeclareArgs(linkProps); + } + + private Map<String,Object> getDeclareArgs(NestedMapAccessor map) + { + Map<String,Object> args = map.getMap(X_DECLARE); + if (args == null) + { + return Collections.emptyMap(); + } + else + { + return args; + } + } + + public Map<String,Object> getLinkSubscribeArgs() + { + Map<String,Object> args = linkProps.getMap(X_SUBSCRIBE); + if (args == null) + { + return Collections.emptyMap(); + } + else + { + return args; + } + } + + public String getLinkName() + { + return linkProps.getString(NAME); + } + + public String getLinkReliability() + { + return linkProps.getString(RELIABILITY); + } + + private int getCapacity(String type) + { + int capacity = 0; + try + { + capacity = linkProps.getInt(CAPACITY); + } + catch(Exception e) + { + try + { + capacity = linkProps.getInt(getFQN(CAPACITY,type)); + } + catch(Exception ex) + { + if (ex instanceof NumberFormatException && !ex.getMessage().equals("null")) + { + _logger.info("Unable to retrieve capacity from address: " + address,ex); + } + } + } + + return capacity; + } + + public int getProducerCapacity() + { + return getCapacity(CAPACITY_TARGET); + } + + public int getConsumeCapacity() + { + return getCapacity(CAPACITY_SOURCE); + } + + public static String getFQN(String... propNames) + { + StringBuilder sb = new StringBuilder(); + for(String prop: propNames) + { + sb.append(prop).append("/"); + } + return sb.substring(0, sb.length() -1); } } |
