summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-09-25 15:15:18 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-09-25 15:15:18 +0000
commit3bdd4780e35b4454477cf423b31ad6915df357fa (patch)
tree2e29ad7a4be731a503869d80d4c8a63e203bd2af /qpid/java
parent52c2bdf4d12655bf341151ed957bea5656b9723d (diff)
downloadqpid-python-3bdd4780e35b4454477cf423b31ad6915df357fa.tar.gz
NO-JIRA : set svn:eol-style to native
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java862
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java472
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java470
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java824
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java296
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java1230
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java904
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java768
8 files changed, 2913 insertions, 2913 deletions
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
index 5e77b7097c..26b0d80783 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
@@ -1,431 +1,431 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-public class Demo extends Util
-{
- private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
- private static final String OPCODE = "opcode";
- private static final String ACTION = "action";
- private static final String MESSAGE_ID = "message-id";
- private static final String VENDOR = "vendor";
- private static final String LOG = "log";
- private static final String RECEIVED = "received";
- private static final String TEST = "test";
- private static final String APACHE = "apache";
- private static final String SENT = "sent";
- private static final String LINK_REF = "link-ref";
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String SASL_USER = "sasl-user";
- private static final String SASL_PASSWORD = "sasl-password";
- private static final String ROLE = "role";
- private static final String ADDRESS = "address";
- private static final String SENDER = "sender";
- private static final String SEND_MESSAGE = "send-message";
- private static final String ANNOUNCE = "announce";
- private static final String MESSAGE_VENDOR = "message-vendor";
- private static final String CREATE_LINK = "create-link";
-
- public static void main(String[] args)
- {
- new Demo(args).run();
- }
-
- public Demo(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- public void run()
- {
-
- try
- {
-
- final String vendor = getArgs()[0];
- final String queue = "control";
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
-
- Receiver responseReceiver;
-
- responseReceiver = session.createTemporaryQueueReceiver();
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
-
- Properties properties = new Properties();
- properties.setMessageId(java.util.UUID.randomUUID());
- properties.setReplyTo(responseReceiver.getAddress());
-
- HashMap appPropMap = new HashMap();
- ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
-
- appPropMap.put(OPCODE, ANNOUNCE);
- appPropMap.put(VENDOR, vendor);
- appPropMap.put(ADDRESS,responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { properties, appProperties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- try
- {
- s.send(message1);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
- Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
-
-
- boolean done = false;
-
- while(!done)
- {
- boolean wait = true;
- Message m = responseReceiver.receive(false);
- if(m != null)
- {
- List<Section> payload = m.getPayload();
- wait = false;
- ApplicationProperties props = m.getApplicationProperties();
- Map map = props.getValue();
- String op = (String) map.get(OPCODE);
- if("reset".equals(op))
- {
- for(Sender sender : sendingLinks.values())
- {
- try
- {
- sender.close();
- Session session1 = sender.getSession();
- session1.close();
- session1.getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- for(Receiver receiver : receivingLinks.values())
- {
- try
- {
- receiver.close();
- receiver.getSession().close();
- receiver.getSession().getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- sendingLinks.clear();
- receivingLinks.clear();
- }
- else if(CREATE_LINK.equals(op))
- {
- Object linkRef = map.get(LINK_REF);
- String host = (String) map.get(HOST);
- Object o = map.get(PORT);
- int port = Integer.parseInt(String.valueOf(o));
- String user = (String) map.get(SASL_USER);
- String password = (String) map.get(SASL_PASSWORD);
- String role = (String) map.get(ROLE);
- String address = (String) map.get(ADDRESS);
- System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
- try{
-
-
- Connection conn2 = new Connection(host, port, user, password, host);
- Session session2 = conn2.createSession();
- if(sendingLinks.containsKey(linkRef))
- {
- try
- {
- sendingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(receivingLinks.containsKey(linkRef))
- {
- try
- {
- receivingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(SENDER.equals(role))
- {
-
- System.err.println("%%% Creating sender (" + linkRef + ")");
- Sender sender = session2.createSender(address);
- sendingLinks.put(linkRef, sender);
- }
- else
- {
-
- System.err.println("%%% Creating receiver (" + linkRef + ")");
- Receiver receiver2 = session2.createReceiver(address);
- receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
- receivingLinks.put(linkRef, receiver2);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- else if(SEND_MESSAGE.equals(op))
- {
- Sender sender = sendingLinks.get(map.get(LINK_REF));
- Properties m2props = new Properties();
- Object messageId = map.get(MESSAGE_ID);
- m2props.setMessageId(messageId);
- Map m2propmap = new HashMap();
- m2propmap.put(OPCODE, TEST);
- m2propmap.put(VENDOR, vendor);
- ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
- Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- try
- {
- sender.send(m2);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, SENT);
- m3propmap.put(MESSAGE_ID, messageId);
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, vendor);
-
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+messageId)));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- responseReceiver.acknowledge(m);
- }
- else
- {
- for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
- {
- m = entry.getValue().receive(false);
- if(m != null)
- {
- wait = false;
-
- System.err.println("%%% Received message from " + entry.getKey());
-
- Properties mp = m.getProperties();
- ApplicationProperties ap = m.getApplicationProperties();
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, RECEIVED);
- m3propmap.put(MESSAGE_ID, mp.getMessageId());
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+mp.getMessageId())));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- entry.getValue().acknowledge(m);
- }
-
- }
- }
-
- if(wait)
- {
- try
- {
- Thread.sleep(500l);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- }
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+public class Demo extends Util
+{
+ private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
+ private static final String OPCODE = "opcode";
+ private static final String ACTION = "action";
+ private static final String MESSAGE_ID = "message-id";
+ private static final String VENDOR = "vendor";
+ private static final String LOG = "log";
+ private static final String RECEIVED = "received";
+ private static final String TEST = "test";
+ private static final String APACHE = "apache";
+ private static final String SENT = "sent";
+ private static final String LINK_REF = "link-ref";
+ private static final String HOST = "host";
+ private static final String PORT = "port";
+ private static final String SASL_USER = "sasl-user";
+ private static final String SASL_PASSWORD = "sasl-password";
+ private static final String ROLE = "role";
+ private static final String ADDRESS = "address";
+ private static final String SENDER = "sender";
+ private static final String SEND_MESSAGE = "send-message";
+ private static final String ANNOUNCE = "announce";
+ private static final String MESSAGE_VENDOR = "message-vendor";
+ private static final String CREATE_LINK = "create-link";
+
+ public static void main(String[] args)
+ {
+ new Demo(args).run();
+ }
+
+ public Demo(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return false;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+ final String vendor = getArgs()[0];
+ final String queue = "control";
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+
+ Receiver responseReceiver;
+
+ responseReceiver = session.createTemporaryQueueReceiver();
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+
+ Properties properties = new Properties();
+ properties.setMessageId(java.util.UUID.randomUUID());
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ HashMap appPropMap = new HashMap();
+ ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
+
+ appPropMap.put(OPCODE, ANNOUNCE);
+ appPropMap.put(VENDOR, vendor);
+ appPropMap.put(ADDRESS,responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { properties, appProperties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ try
+ {
+ s.send(message1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
+ Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
+
+
+ boolean done = false;
+
+ while(!done)
+ {
+ boolean wait = true;
+ Message m = responseReceiver.receive(false);
+ if(m != null)
+ {
+ List<Section> payload = m.getPayload();
+ wait = false;
+ ApplicationProperties props = m.getApplicationProperties();
+ Map map = props.getValue();
+ String op = (String) map.get(OPCODE);
+ if("reset".equals(op))
+ {
+ for(Sender sender : sendingLinks.values())
+ {
+ try
+ {
+ sender.close();
+ Session session1 = sender.getSession();
+ session1.close();
+ session1.getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ for(Receiver receiver : receivingLinks.values())
+ {
+ try
+ {
+ receiver.close();
+ receiver.getSession().close();
+ receiver.getSession().getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ sendingLinks.clear();
+ receivingLinks.clear();
+ }
+ else if(CREATE_LINK.equals(op))
+ {
+ Object linkRef = map.get(LINK_REF);
+ String host = (String) map.get(HOST);
+ Object o = map.get(PORT);
+ int port = Integer.parseInt(String.valueOf(o));
+ String user = (String) map.get(SASL_USER);
+ String password = (String) map.get(SASL_PASSWORD);
+ String role = (String) map.get(ROLE);
+ String address = (String) map.get(ADDRESS);
+ System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
+ try{
+
+
+ Connection conn2 = new Connection(host, port, user, password, host);
+ Session session2 = conn2.createSession();
+ if(sendingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ sendingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(receivingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ receivingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(SENDER.equals(role))
+ {
+
+ System.err.println("%%% Creating sender (" + linkRef + ")");
+ Sender sender = session2.createSender(address);
+ sendingLinks.put(linkRef, sender);
+ }
+ else
+ {
+
+ System.err.println("%%% Creating receiver (" + linkRef + ")");
+ Receiver receiver2 = session2.createReceiver(address);
+ receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+ receivingLinks.put(linkRef, receiver2);
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ else if(SEND_MESSAGE.equals(op))
+ {
+ Sender sender = sendingLinks.get(map.get(LINK_REF));
+ Properties m2props = new Properties();
+ Object messageId = map.get(MESSAGE_ID);
+ m2props.setMessageId(messageId);
+ Map m2propmap = new HashMap();
+ m2propmap.put(OPCODE, TEST);
+ m2propmap.put(VENDOR, vendor);
+ ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
+ Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
+ try
+ {
+ sender.send(m2);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, SENT);
+ m3propmap.put(MESSAGE_ID, messageId);
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, vendor);
+
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+messageId)));
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ responseReceiver.acknowledge(m);
+ }
+ else
+ {
+ for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
+ {
+ m = entry.getValue().receive(false);
+ if(m != null)
+ {
+ wait = false;
+
+ System.err.println("%%% Received message from " + entry.getKey());
+
+ Properties mp = m.getProperties();
+ ApplicationProperties ap = m.getApplicationProperties();
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, RECEIVED);
+ m3propmap.put(MESSAGE_ID, mp.getMessageId());
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+mp.getMessageId())));
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ entry.getValue().acknowledge(m);
+ }
+
+ }
+ }
+
+ if(wait)
+ {
+ try
+ {
+ Thread.sleep(500l);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+
+ }
+
+
+
+
+
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ }
+ catch (ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return false;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
index dbe273182f..7845e318cb 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
@@ -1,236 +1,236 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.Arrays;
-
-public class Request extends Util
-{
- private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
-
- public static void main(String[] args)
- {
- new Request(args).run();
- }
-
- public Request(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public void run()
- {
-
- try
- {
-
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
- Connection conn2;
- Session session2;
- Receiver responseReceiver;
-
- if(isUseMultipleConnections())
- {
- conn2 = newConnection();
- session2 = conn2.createSession();
- responseReceiver = session2.createTemporaryQueueReceiver();
- }
- else
- {
- conn2 = null;
- session2 = null;
- responseReceiver = session.createTemporaryQueueReceiver();
- }
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- int received = 0;
-
- if(getArgs().length >= 2)
- {
- message = getArgs()[1];
- if(message.length() < getMessageSize())
- {
- StringBuilder builder = new StringBuilder(getMessageSize());
- builder.append(message);
- for(int x = message.length(); x < getMessageSize(); x++)
- {
- builder.append('.');
- }
- message = builder.toString();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- properties.setReplyTo(responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { new Header() , properties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
-
- Message responseMessage = responseReceiver.receive(false);
- if(responseMessage != null)
- {
- responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
- received++;
- }
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
-
- while(received < getCount())
- {
- Message responseMessage = responseReceiver.receive();
- responseReceiver.acknowledge(responseMessage.getDeliveryTag());
- received++;
- }
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- if(session2 != null)
- {
- session2.close();
- conn2.close();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+
+public class Request extends Util
+{
+ private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
+
+ public static void main(String[] args)
+ {
+ new Request(args).run();
+ }
+
+ public Request(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+ Connection conn2;
+ Session session2;
+ Receiver responseReceiver;
+
+ if(isUseMultipleConnections())
+ {
+ conn2 = newConnection();
+ session2 = conn2.createSession();
+ responseReceiver = session2.createTemporaryQueueReceiver();
+ }
+ else
+ {
+ conn2 = null;
+ session2 = null;
+ responseReceiver = session.createTemporaryQueueReceiver();
+ }
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ int received = 0;
+
+ if(getArgs().length >= 2)
+ {
+ message = getArgs()[1];
+ if(message.length() < getMessageSize())
+ {
+ StringBuilder builder = new StringBuilder(getMessageSize());
+ builder.append(message);
+ for(int x = message.length(); x < getMessageSize(); x++)
+ {
+ builder.append('.');
+ }
+ message = builder.toString();
+ }
+
+ for(int i = 0; i < getCount(); i++)
+ {
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { new Header() , properties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+
+ Message responseMessage = responseReceiver.receive(false);
+ if(responseMessage != null)
+ {
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
+ received++;
+ }
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+
+ while(received < getCount())
+ {
+ Message responseMessage = responseReceiver.receive();
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag());
+ received++;
+ }
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ if(session2 != null)
+ {
+ session2.close();
+ conn2.close();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return true;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
index b4ae16ab3f..36aadc7851 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
@@ -1,235 +1,235 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.util.Arrays;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-public class Send extends Util
-{
- private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
- private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-
- public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException
- {
- new Send(args).run();
- }
-
-
- public Send(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSubjectOption()
- {
- return true;
- }
-
- public void run()
- {
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
- if(getArgs().length == 2)
- {
- message = getArgs()[1];
- }
- for(int i = 0; i < getCount(); i++)
- {
-
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- if(getSubject() != null)
- {
- properties.setSubject(getSubject());
- }
- Section bodySection;
- byte[] bytes = (message + " " + i).getBytes();
- if(bytes.length < getMessageSize())
- {
- byte[] origBytes = bytes;
- bytes = new byte[getMessageSize()];
- System.arraycopy(origBytes,0,bytes,0,origBytes.length);
- for(int x = origBytes.length; x < bytes.length; x++)
- {
- bytes[x] = (byte) '.';
- }
- bodySection = new Data(new Binary(bytes));
- }
- else
- {
- bodySection = new AmqpValue(message + " " + i);
- }
-
- Section[] sections = {properties, bodySection};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
- }
- }
- else
- {
- for(int i = 1; i < getArgs().length; i++)
- {
- s.send(new Message(getArgs()[i]), txn);
- }
-
- }
- }
- else
- {
- LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
-
-
- try
- {
- while((message = buf.readLine()) != null)
- {
- s.send(new Message(message), txn);
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
- s.close();
-
- session.close();
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.util.Arrays;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+public class Send extends Util
+{
+ private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
+ private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
+
+
+ public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException
+ {
+ new Send(args).run();
+ }
+
+
+ public Send(final String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasSubjectOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ try
+ {
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ if(!useStdIn())
+ {
+ if(getArgs().length <= 2)
+ {
+ if(getArgs().length == 2)
+ {
+ message = getArgs()[1];
+ }
+ for(int i = 0; i < getCount(); i++)
+ {
+
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ if(getSubject() != null)
+ {
+ properties.setSubject(getSubject());
+ }
+ Section bodySection;
+ byte[] bytes = (message + " " + i).getBytes();
+ if(bytes.length < getMessageSize())
+ {
+ byte[] origBytes = bytes;
+ bytes = new byte[getMessageSize()];
+ System.arraycopy(origBytes,0,bytes,0,origBytes.length);
+ for(int x = origBytes.length; x < bytes.length; x++)
+ {
+ bytes[x] = (byte) '.';
+ }
+ bodySection = new Data(new Binary(bytes));
+ }
+ else
+ {
+ bodySection = new AmqpValue(message + " " + i);
+ }
+
+ Section[] sections = {properties, bodySection};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+ }
+ }
+ else
+ {
+ for(int i = 1; i < getArgs().length; i++)
+ {
+ s.send(new Message(getArgs()[i]), txn);
+ }
+
+ }
+ }
+ else
+ {
+ LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
+
+
+ try
+ {
+ while((message = buf.readLine()) != null)
+ {
+ s.send(new Message(message), txn);
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+ s.close();
+
+ session.close();
+ conn.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index 4c5ffeb177..f66a33b978 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -1,412 +1,412 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ssl.SSLSocketFactory;
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-public class Connection
-{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
- private static final int MAX_FRAME_SIZE = 65536;
-
- private String _address;
- private ConnectionEndpoint _conn;
- private int _sessionCount;
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password, String remoteHostname) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,new Container());
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,MAX_FRAME_SIZE,container);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container, null);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
- }
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname, boolean ssl) throws ConnectionException
- {
-
- _address = address;
-
- try
- {
- final Socket s;
- if(ssl)
- {
- s = SSLSocketFactory.getDefault().createSocket(address, port);
- }
- else
- {
- s = new Socket(address, port);
- }
-
-
- Principal principal = username == null ? null : new Principal()
- {
-
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteAddress(s.getRemoteSocketAddress());
- _conn.setRemoteHostname(remoteHostname);
-
-
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
-
- final OutputStream outputStream = s.getOutputStream();
-
- ConnectionHandler.BytesSource src;
-
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
-
- _conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
- }
-
-
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
- Thread outputThread = new Thread(outputHandler);
- outputThread.setDaemon(true);
- outputThread.start();
- _conn.setFrameOutputHandler(out);
-
-
-
- final ConnectionHandler handler = new ConnectionHandler(_conn);
- final InputStream inputStream = s.getInputStream();
-
- Thread inputThread = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- doRead(handler, inputStream);
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-
- inputThread.setDaemon(true);
- inputThread.start();
-
- _conn.open();
-
- }
- catch (IOException e)
- {
- throw new ConnectionException(e);
- }
-
-
- }
-
- private Connection(ConnectionEndpoint endpoint)
- {
- _conn = endpoint;
- }
-
-
- private void doRead(final AMQPTransport transport, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
- ByteBuffer bbuf = ByteBuffer.wrap(buf);
- final Object lock = new Object();
- transport.setInputStateChangeListener(new StateChangeListener(){
-
- public void onStateChange(final boolean active)
- {
- synchronized(lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- try
- {
- int read;
- while((read = inputStream.read(buf)) != -1)
- {
- bbuf.position(0);
- bbuf.limit(read);
-
- while(bbuf.hasRemaining() && transport.isOpenForInput())
- {
- transport.processBytes(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- }
-
- public Session createSession() throws ConnectionException
- {
- checkNotClosed();
- Session session = new Session(this,String.valueOf(_sessionCount++));
- return session;
- }
-
- void checkNotClosed() throws ConnectionClosedException
- {
- if(getEndpoint().isClosed())
- {
- throw new ConnectionClosedException(getEndpoint().getRemoteError());
- }
- }
-
- public ConnectionEndpoint getEndpoint()
- {
- return _conn;
- }
-
- public void awaitOpen()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
- private void doRead(final ConnectionHandler handler, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
-
-
- try
- {
- int read;
- boolean done = false;
- while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
- {
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- Binary b = new Binary(buf,0,read);
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
- }
- while(bbuf.hasRemaining() && !handler.isDone())
- {
- handler.parse(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public void close()
- {
- _conn.close();
-
- synchronized (_conn.getLock())
- {
- while(!_conn.closedForInput())
- {
- try
- {
- _conn.getLock().wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+public class Connection
+{
+ private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+ private static final int MAX_FRAME_SIZE = 65536;
+
+ private String _address;
+ private ConnectionEndpoint _conn;
+ private int _sessionCount;
+
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password, String remoteHostname) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,new Container());
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,MAX_FRAME_SIZE,container);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container, null);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+ }
+
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname, boolean ssl) throws ConnectionException
+ {
+
+ _address = address;
+
+ try
+ {
+ final Socket s;
+ if(ssl)
+ {
+ s = SSLSocketFactory.getDefault().createSocket(address, port);
+ }
+ else
+ {
+ s = new Socket(address, port);
+ }
+
+
+ Principal principal = username == null ? null : new Principal()
+ {
+
+ public String getName()
+ {
+ return username;
+ }
+ };
+ _conn = new ConnectionEndpoint(container, principal, password);
+ _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ _conn.setRemoteAddress(s.getRemoteSocketAddress());
+ _conn.setRemoteHostname(remoteHostname);
+
+
+
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
+
+
+ final OutputStream outputStream = s.getOutputStream();
+
+ ConnectionHandler.BytesSource src;
+
+ if(_conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
+
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+
+ _conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+ }
+
+
+ ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+ Thread outputThread = new Thread(outputHandler);
+ outputThread.setDaemon(true);
+ outputThread.start();
+ _conn.setFrameOutputHandler(out);
+
+
+
+ final ConnectionHandler handler = new ConnectionHandler(_conn);
+ final InputStream inputStream = s.getInputStream();
+
+ Thread inputThread = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ doRead(handler, inputStream);
+ }
+ finally
+ {
+ if(_conn.closedForInput() && _conn.closedForOutput())
+ {
+ try
+ {
+ s.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ });
+
+ inputThread.setDaemon(true);
+ inputThread.start();
+
+ _conn.open();
+
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionException(e);
+ }
+
+
+ }
+
+ private Connection(ConnectionEndpoint endpoint)
+ {
+ _conn = endpoint;
+ }
+
+
+ private void doRead(final AMQPTransport transport, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+ ByteBuffer bbuf = ByteBuffer.wrap(buf);
+ final Object lock = new Object();
+ transport.setInputStateChangeListener(new StateChangeListener(){
+
+ public void onStateChange(final boolean active)
+ {
+ synchronized(lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ });
+
+ try
+ {
+ int read;
+ while((read = inputStream.read(buf)) != -1)
+ {
+ bbuf.position(0);
+ bbuf.limit(read);
+
+ while(bbuf.hasRemaining() && transport.isOpenForInput())
+ {
+ transport.processBytes(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ public Session createSession() throws ConnectionException
+ {
+ checkNotClosed();
+ Session session = new Session(this,String.valueOf(_sessionCount++));
+ return session;
+ }
+
+ void checkNotClosed() throws ConnectionClosedException
+ {
+ if(getEndpoint().isClosed())
+ {
+ throw new ConnectionClosedException(getEndpoint().getRemoteError());
+ }
+ }
+
+ public ConnectionEndpoint getEndpoint()
+ {
+ return _conn;
+ }
+
+ public void awaitOpen()
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
+ {
+ try
+ {
+ getEndpoint().getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+
+ private void doRead(final ConnectionHandler handler, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+
+
+ try
+ {
+ int read;
+ boolean done = false;
+ while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
+ {
+ ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
+ Binary b = new Binary(buf,0,read);
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
+ }
+ while(bbuf.hasRemaining() && !handler.isDone())
+ {
+ handler.parse(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public void close()
+ {
+ _conn.close();
+
+ synchronized (_conn.getLock())
+ {
+ while(!_conn.closedForInput())
+ {
+ try
+ {
+ _conn.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
index 7c1172898b..e8ac1de6c1 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
@@ -1,148 +1,148 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class Message
-{
- private Binary _deliveryTag;
- private List<Section> _payload = new ArrayList<Section>();
- private Boolean _resume;
- private boolean _settled;
- private DeliveryState _deliveryState;
- private Receiver _receiver;
-
-
- public Message()
- {
- }
-
- public Message(Collection<Section> sections)
- {
- _payload.addAll(sections);
- }
-
- public Message(Section section)
- {
- this(Collections.singletonList(section));
- }
-
- public Message(String message)
- {
- this(new AmqpValue(message));
- }
-
-
- public Binary getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- public void setDeliveryTag(Binary deliveryTag)
- {
- _deliveryTag = deliveryTag;
- }
-
- public List<Section> getPayload()
- {
- return Collections.unmodifiableList(_payload);
- }
-
- private <T extends Section> T getSection(Class<T> clazz)
- {
- for(Section s : _payload)
- {
- if(clazz.isAssignableFrom(s.getClass()))
- {
- return (T) s;
- }
- }
- return null;
- }
-
- public ApplicationProperties getApplicationProperties()
- {
- return getSection(ApplicationProperties.class);
- }
-
- public Properties getProperties()
- {
- return getSection(Properties.class);
- }
-
- public Header getHeader()
- {
- return getSection(Header.class);
- }
-
-
- public void setResume(final Boolean resume)
- {
- _resume = resume;
- }
-
- public boolean isResume()
- {
- return Boolean.TRUE.equals(_resume);
- }
-
- public void setDeliveryState(DeliveryState state)
- {
- _deliveryState = state;
- }
-
- public DeliveryState getDeliveryState()
- {
- return _deliveryState;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
-
- public boolean getSettled()
- {
- return _settled;
- }
-
- public void setReceiver(final Receiver receiver)
- {
- _receiver = receiver;
- }
-
- public Receiver getReceiver()
- {
- return _receiver;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class Message
+{
+ private Binary _deliveryTag;
+ private List<Section> _payload = new ArrayList<Section>();
+ private Boolean _resume;
+ private boolean _settled;
+ private DeliveryState _deliveryState;
+ private Receiver _receiver;
+
+
+ public Message()
+ {
+ }
+
+ public Message(Collection<Section> sections)
+ {
+ _payload.addAll(sections);
+ }
+
+ public Message(Section section)
+ {
+ this(Collections.singletonList(section));
+ }
+
+ public Message(String message)
+ {
+ this(new AmqpValue(message));
+ }
+
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public void setDeliveryTag(Binary deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ public List<Section> getPayload()
+ {
+ return Collections.unmodifiableList(_payload);
+ }
+
+ private <T extends Section> T getSection(Class<T> clazz)
+ {
+ for(Section s : _payload)
+ {
+ if(clazz.isAssignableFrom(s.getClass()))
+ {
+ return (T) s;
+ }
+ }
+ return null;
+ }
+
+ public ApplicationProperties getApplicationProperties()
+ {
+ return getSection(ApplicationProperties.class);
+ }
+
+ public Properties getProperties()
+ {
+ return getSection(Properties.class);
+ }
+
+ public Header getHeader()
+ {
+ return getSection(Header.class);
+ }
+
+
+ public void setResume(final Boolean resume)
+ {
+ _resume = resume;
+ }
+
+ public boolean isResume()
+ {
+ return Boolean.TRUE.equals(_resume);
+ }
+
+ public void setDeliveryState(DeliveryState state)
+ {
+ _deliveryState = state;
+ }
+
+ public DeliveryState getDeliveryState()
+ {
+ return _deliveryState;
+ }
+
+ public void setSettled(boolean settled)
+ {
+ _settled = settled;
+ }
+
+ public boolean getSettled()
+ {
+ return _settled;
+ }
+
+ public void setReceiver(final Receiver receiver)
+ {
+ _receiver = receiver;
+ }
+
+ public Receiver getReceiver()
+ {
+ return _receiver;
+ }
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index 596931088f..5175d1d847 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -1,615 +1,615 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class Receiver implements DeliveryStateHandler
-{
- private ReceivingLinkEndpoint _endpoint;
- private int _id;
- private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
- private Session _session;
-
- private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
- private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
- private MessageArrivalListener _messageArrivalListener;
- private org.apache.qpid.amqp_1_0.type.transport.Error _error;
- private Runnable _remoteErrorTask;
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode) throws ConnectionErrorException
- {
- this(session, linkName, target, source, ackMode, false);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- boolean isDurable) throws ConnectionErrorException
- {
- this(session,linkName,target,source,ackMode,isDurable,null);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- final boolean isDurable,
- final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
- {
-
- session.getConnection().checkNotClosed();
- _session = session;
- if(isDurable)
- {
- source.setDurable(TerminusDurability.UNSETTLED_STATE);
- source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- else if(source != null)
- {
- source.setDurable(TerminusDurability.NONE);
- source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- }
- _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
- UnsignedInteger.ZERO);
-
- _endpoint.setDeliveryStateHandler(this);
-
- switch(ackMode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
- {
- @Override public void messageTransfer(final Transfer xfr)
- {
- _prefetchQueue.add(xfr);
- postPrefetchAction();
- }
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(detach.getError()!=null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
-
- _endpoint.setLocalUnsettled(unsettled);
- _endpoint.attach();
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isAttached() && !_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- if(_endpoint.getSource() == null)
- {
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- throw new ConnectionErrorException(getError());
- }
- else
- {
-
- }
- }
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
- private void postPrefetchAction()
- {
- if(_messageArrivalListener != null)
- {
- _messageArrivalListener.messageArrived(this);
- }
- }
-
- public void setCredit(UnsignedInteger credit, boolean window)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setCreditWindow(window);
-
- }
-
-
- public String getAddress()
- {
- return ((Source)_endpoint.getSource()).getAddress();
- }
-
- public Map getFilter()
- {
- return ((Source)_endpoint.getSource()).getFilter();
- }
-
- public Message receive()
- {
- return receive(-1L);
- }
-
- public Message receive(boolean wait)
- {
- return receive(wait ? -1L : 0L);
- }
-
- // 0 means no wait, -1 wait forever
- public Message receive(long wait)
- {
- Message m = null;
- Transfer xfr;
- long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
-
- while((xfr = receiveFromPrefetch(wait)) != null )
- {
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- Binary deliveryTag = xfr.getDeliveryTag();
- Boolean resume = xfr.getResume();
-
- List<Section> sections = new ArrayList<Section>();
- List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
- int totalSize = 0;
-
- boolean hasMore;
- do
- {
- hasMore = Boolean.TRUE.equals(xfr.getMore());
-
- ByteBuffer buf = xfr.getPayload();
-
- if(buf != null)
- {
-
- totalSize += buf.remaining();
-
- payloads.add(buf);
- }
- if(hasMore)
- {
- xfr = receiveFromPrefetch(-1l);
- if(xfr== null)
- {
- // TODO - this is wrong!!!!
- System.out.println("eeek");
- }
- }
- }
- while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
- for(ByteBuffer payload : payloads)
- {
- allPayload.put(payload);
- }
- allPayload.flip();
- SectionDecoder decoder = _session.getSectionDecoder();
-
- try
- {
- sections = decoder.parseAll(allPayload);
- }
- catch (AmqpErrorException e)
- {
- // todo - throw a sensible error
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- m = new Message(sections);
- m.setDeliveryTag(deliveryTag);
- m.setResume(resume);
- m.setReceiver(this);
- break;
- }
- }
-
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <=0L)
- {
- break;
- }
- }
- }
-
-
- return m;
-
- }
-
- private Transfer receiveFromPrefetch(long wait)
- {
- long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- Transfer xfr;
- while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
- && wait != 0)
- {
- try
- {
- if(wait>0L)
- {
- lock.wait(wait);
- }
- else if(wait<0L)
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- return null;
- }
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <= 0L)
- {
- break;
- }
- }
-
- }
- if(xfr != null)
- {
- _prefetchQueue.poll();
-
- }
-
- return xfr;
- }
-
- }
-
-
- public void release(final Message m)
- {
- release(m.getDeliveryTag());
- }
-
- public void release(Binary deliveryTag)
- {
- update(new Released(), deliveryTag, null, null);
- }
-
-
- public void modified(Binary tag)
- {
- final Modified outcome = new Modified();
- outcome.setDeliveryFailed(true);
-
- update(outcome, tag, null, null);
- }
-
- public void acknowledge(final Message m)
- {
- acknowledge(m.getDeliveryTag());
- }
-
- public void acknowledge(final Message m, SettledAction a)
- {
- acknowledge(m.getDeliveryTag(), a);
- }
-
-
- public void acknowledge(final Message m, Transaction txn)
- {
- acknowledge(m.getDeliveryTag(), txn);
- }
-
-
- public void acknowledge(final Binary deliveryTag)
- {
- acknowledge(deliveryTag, null, null);
- }
-
-
- public void acknowledge(final Binary deliveryTag, SettledAction a)
- {
- acknowledge(deliveryTag, null, a);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn)
- {
- acknowledge(deliveryTag, txn, null);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- update(new Accepted(), deliveryTag, txn, action);
- }
-
- public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
-
- DeliveryState state;
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
-
- _endpoint.updateDisposition(deliveryTag,state, settled);
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public void acknowledgeAll(Message m)
- {
- acknowledgeAll(m.getDeliveryTag());
- }
-
- public void acknowledgeAll(Binary deliveryTag)
- {
- acknowledgeAll(deliveryTag, null, null);
- }
-
- public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- updateAll(new Accepted(), deliveryTag, txn, action);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag)
- {
- updateAll(outcome, deliveryTag, null, null);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- DeliveryState state;
-
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
- _endpoint.updateAllDisposition(deliveryTag, state, settled);
- }
-
-
-
- public void close()
- {
- _endpoint.setTarget(null);
- _endpoint.close();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
-
- public void detach()
- {
- _endpoint.setTarget(null);
- _endpoint.detach();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
- public void drain()
- {
- _endpoint.drain();
- }
-
- /**
- * Waits for the receiver to drain or a message to be available to be received.
- * @return true if the receiver has been drained.
- */
- public boolean drainWait()
- {
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- try
- {
- while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- }
- }
- return _prefetchQueue.peek()==null && _endpoint.isDrained();
- }
-
- /**
- * Clears the receiver drain so that message delivery can resume.
- */
- public void clearDrain()
- {
- _endpoint.clearDrain();
- }
-
- public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
- _endpoint.setCreditWindow(false);
-
- }
-
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- SettledAction action = _unsettledMap.remove(deliveryTag);
- if(action != null)
- {
- action.onSettled(deliveryTag);
- }
- }
- }
-
- public Map<Binary, Outcome> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
-
- public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
- {
- synchronized(_endpoint.getLock())
- {
- _messageArrivalListener = messageArrivalListener;
- int prefetchSize = _prefetchQueue.size();
- for(int i = 0; i < prefetchSize; i++)
- {
- postPrefetchAction();
- }
- }
- }
-
- public Session getSession()
- {
- return _session;
- }
-
- public org.apache.qpid.amqp_1_0.type.Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public static interface SettledAction
- {
- public void onSettled(Binary deliveryTag);
- }
-
-
- public interface MessageArrivalListener
- {
- void messageArrived(Receiver receiver);
- }
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Receiver implements DeliveryStateHandler
+{
+ private ReceivingLinkEndpoint _endpoint;
+ private int _id;
+ private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
+ private Session _session;
+
+ private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
+ private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+ private MessageArrivalListener _messageArrivalListener;
+ private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode) throws ConnectionErrorException
+ {
+ this(session, linkName, target, source, ackMode, false);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ boolean isDurable) throws ConnectionErrorException
+ {
+ this(session,linkName,target,source,ackMode,isDurable,null);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ final boolean isDurable,
+ final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
+ {
+
+ session.getConnection().checkNotClosed();
+ _session = session;
+ if(isDurable)
+ {
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ else if(source != null)
+ {
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ }
+ _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
+ UnsignedInteger.ZERO);
+
+ _endpoint.setDeliveryStateHandler(this);
+
+ switch(ackMode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+
+ _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
+ {
+ @Override public void messageTransfer(final Transfer xfr)
+ {
+ _prefetchQueue.add(xfr);
+ postPrefetchAction();
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
+
+ _endpoint.setLocalUnsettled(unsettled);
+ _endpoint.attach();
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isAttached() && !_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ if(_endpoint.getSource() == null)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ throw new ConnectionErrorException(getError());
+ }
+ else
+ {
+
+ }
+ }
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+ private void postPrefetchAction()
+ {
+ if(_messageArrivalListener != null)
+ {
+ _messageArrivalListener.messageArrived(this);
+ }
+ }
+
+ public void setCredit(UnsignedInteger credit, boolean window)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setCreditWindow(window);
+
+ }
+
+
+ public String getAddress()
+ {
+ return ((Source)_endpoint.getSource()).getAddress();
+ }
+
+ public Map getFilter()
+ {
+ return ((Source)_endpoint.getSource()).getFilter();
+ }
+
+ public Message receive()
+ {
+ return receive(-1L);
+ }
+
+ public Message receive(boolean wait)
+ {
+ return receive(wait ? -1L : 0L);
+ }
+
+ // 0 means no wait, -1 wait forever
+ public Message receive(long wait)
+ {
+ Message m = null;
+ Transfer xfr;
+ long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
+
+ while((xfr = receiveFromPrefetch(wait)) != null )
+ {
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ Binary deliveryTag = xfr.getDeliveryTag();
+ Boolean resume = xfr.getResume();
+
+ List<Section> sections = new ArrayList<Section>();
+ List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
+ int totalSize = 0;
+
+ boolean hasMore;
+ do
+ {
+ hasMore = Boolean.TRUE.equals(xfr.getMore());
+
+ ByteBuffer buf = xfr.getPayload();
+
+ if(buf != null)
+ {
+
+ totalSize += buf.remaining();
+
+ payloads.add(buf);
+ }
+ if(hasMore)
+ {
+ xfr = receiveFromPrefetch(-1l);
+ if(xfr== null)
+ {
+ // TODO - this is wrong!!!!
+ System.out.println("eeek");
+ }
+ }
+ }
+ while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
+ for(ByteBuffer payload : payloads)
+ {
+ allPayload.put(payload);
+ }
+ allPayload.flip();
+ SectionDecoder decoder = _session.getSectionDecoder();
+
+ try
+ {
+ sections = decoder.parseAll(allPayload);
+ }
+ catch (AmqpErrorException e)
+ {
+ // todo - throw a sensible error
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ m = new Message(sections);
+ m.setDeliveryTag(deliveryTag);
+ m.setResume(resume);
+ m.setReceiver(this);
+ break;
+ }
+ }
+
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <=0L)
+ {
+ break;
+ }
+ }
+ }
+
+
+ return m;
+
+ }
+
+ private Transfer receiveFromPrefetch(long wait)
+ {
+ long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ Transfer xfr;
+ while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
+ && wait != 0)
+ {
+ try
+ {
+ if(wait>0L)
+ {
+ lock.wait(wait);
+ }
+ else if(wait<0L)
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <= 0L)
+ {
+ break;
+ }
+ }
+
+ }
+ if(xfr != null)
+ {
+ _prefetchQueue.poll();
+
+ }
+
+ return xfr;
+ }
+
+ }
+
+
+ public void release(final Message m)
+ {
+ release(m.getDeliveryTag());
+ }
+
+ public void release(Binary deliveryTag)
+ {
+ update(new Released(), deliveryTag, null, null);
+ }
+
+
+ public void modified(Binary tag)
+ {
+ final Modified outcome = new Modified();
+ outcome.setDeliveryFailed(true);
+
+ update(outcome, tag, null, null);
+ }
+
+ public void acknowledge(final Message m)
+ {
+ acknowledge(m.getDeliveryTag());
+ }
+
+ public void acknowledge(final Message m, SettledAction a)
+ {
+ acknowledge(m.getDeliveryTag(), a);
+ }
+
+
+ public void acknowledge(final Message m, Transaction txn)
+ {
+ acknowledge(m.getDeliveryTag(), txn);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag)
+ {
+ acknowledge(deliveryTag, null, null);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag, SettledAction a)
+ {
+ acknowledge(deliveryTag, null, a);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn)
+ {
+ acknowledge(deliveryTag, txn, null);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ update(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+
+ DeliveryState state;
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+
+ _endpoint.updateDisposition(deliveryTag,state, settled);
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
+ public void acknowledgeAll(Message m)
+ {
+ acknowledgeAll(m.getDeliveryTag());
+ }
+
+ public void acknowledgeAll(Binary deliveryTag)
+ {
+ acknowledgeAll(deliveryTag, null, null);
+ }
+
+ public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ updateAll(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag)
+ {
+ updateAll(outcome, deliveryTag, null, null);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ DeliveryState state;
+
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+ _endpoint.updateAllDisposition(deliveryTag, state, settled);
+ }
+
+
+
+ public void close()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.close();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+
+ public void detach()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.detach();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+ public void drain()
+ {
+ _endpoint.drain();
+ }
+
+ /**
+ * Waits for the receiver to drain or a message to be available to be received.
+ * @return true if the receiver has been drained.
+ */
+ public boolean drainWait()
+ {
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ try
+ {
+ while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ return _prefetchQueue.peek()==null && _endpoint.isDrained();
+ }
+
+ /**
+ * Clears the receiver drain so that message delivery can resume.
+ */
+ public void clearDrain()
+ {
+ _endpoint.clearDrain();
+ }
+
+ public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
+ _endpoint.setCreditWindow(false);
+
+ }
+
+ public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ SettledAction action = _unsettledMap.remove(deliveryTag);
+ if(action != null)
+ {
+ action.onSettled(deliveryTag);
+ }
+ }
+ }
+
+ public Map<Binary, Outcome> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+
+ public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ _messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
+ }
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public org.apache.qpid.amqp_1_0.type.Source getSource()
+ {
+ return _endpoint.getSource();
+ }
+
+ public static interface SettledAction
+ {
+ public void onSettled(Binary deliveryTag);
+ }
+
+
+ public interface MessageArrivalListener
+ {
+ void messageArrived(Receiver receiver);
+ }
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index 0feaa48805..851d02a798 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -1,452 +1,452 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
- private SendingLinkEndpoint _endpoint;
- private int _id;
- private Session _session;
- private int _windowSize;
- private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
- private boolean _closed;
- private Error _error;
- private Runnable _remoteErrorTask;
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, false);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- boolean synchronous)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
- }
-
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, AcknowledgeMode.ALO);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
- source.setAddress(sourceAddr);
- return source;
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
- target.setAddress(targetAddr);
- if(isDurable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- return target;
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
-
- _session = session;
- session.getConnection().checkNotClosed();
- _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
- source, target, unsettled);
-
-
- switch(mode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.attach();
- _windowSize = window;
-
- synchronized(_endpoint.getLock())
- {
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
- }
- if(_endpoint.getTarget()== null)
- {
- throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- };
- }
-
- _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
- {
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(_error != null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
- }
-
- public Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public Target getTarget()
- {
- return _endpoint.getTarget();
- }
-
- public void send(Message message) throws LinkDetachedException
- {
- send(message, null, null);
- }
-
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
- {
- send(message, null, action);
- }
-
- public void send(Message message, final Transaction txn) throws LinkDetachedException
- {
- send(message, txn, null);
- }
-
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
- {
-
- List<Section> sections = message.getPayload();
-
- Transfer xfr = new Transfer();
-
- if(sections != null && !sections.isEmpty())
- {
- SectionEncoder encoder = _session.getSectionEncoder();
- encoder.reset();
-
- int sectionNumber = 0;
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- }
-
-
- Binary encoding = encoder.getEncoding();
- ByteBuffer payload = encoding.asByteBuffer();
- xfr.setPayload(payload);
- }
- if(message.getDeliveryTag() == null)
- {
- message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
- }
- if(message.isResume())
- {
- xfr.setResume(Boolean.TRUE);
- }
- if(message.getDeliveryState() != null)
- {
- xfr.setState(message.getDeliveryState());
- }
-
- xfr.setDeliveryTag(message.getDeliveryTag());
- //xfr.setSettled(_windowSize ==0);
- if(txn != null)
- {
- xfr.setSettled(false);
- TransactionalState deliveryState = new TransactionalState();
- deliveryState.setTxnId(txn.getTxnId());
- xfr.setState(deliveryState);
- }
- else
- {
- xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
- }
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- if(action != null)
- {
- _outcomeActions.put(message.getDeliveryTag(), action);
- }
- _endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
-
- if(_windowSize != 0)
- {
- synchronized(lock)
- {
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
-
- }
-
- public void close() throws SenderClosingException
- {
-
- if(_windowSize != 0)
- {
- synchronized(_endpoint.getLock())
- {
-
-
- while(_endpoint.getUnsettledCount() > 0)
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
- _session.removeSender(this);
- _endpoint.setSource(null);
- _endpoint.detach();
- _closed = true;
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderClosingException(e);
- }
- }
- }
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(state instanceof Outcome)
- {
- OutcomeAction action;
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, (Outcome) state);
- }
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
- }
- else if(state instanceof TransactionalState)
- {
- OutcomeAction action;
-
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
- }
-
- }
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Map<Binary, DeliveryState> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
- public Session getSession()
- {
- return _session;
- }
-
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public class SenderCreationException extends Exception
- {
- public SenderCreationException(Throwable e)
- {
- super(e);
- }
-
- public SenderCreationException(String e)
- {
- super(e);
-
- }
- }
-
- public class SenderClosingException extends Exception
- {
- public SenderClosingException(Throwable e)
- {
- super(e);
- }
- }
-
- public static interface OutcomeAction
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class Sender implements DeliveryStateHandler
+{
+ private SendingLinkEndpoint _endpoint;
+ private int _id;
+ private Session _session;
+ private int _windowSize;
+ private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
+ private boolean _closed;
+ private Error _error;
+ private Runnable _remoteErrorTask;
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, false);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ boolean synchronous)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window) throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
+ }
+
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window) throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, target, source, window, AcknowledgeMode.ALO);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, target, source, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+ this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
+ source.setAddress(sourceAddr);
+ return source;
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
+ target.setAddress(targetAddr);
+ if(isDurable)
+ {
+ target.setDurable(TerminusDurability.UNSETTLED_STATE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ return target;
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException, ConnectionClosedException
+ {
+
+ _session = session;
+ session.getConnection().checkNotClosed();
+ _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
+ source, target, unsettled);
+
+
+ switch(mode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+ _endpoint.setDeliveryStateHandler(this);
+ _endpoint.attach();
+ _windowSize = window;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderCreationException(e);
+ }
+ }
+ if(_endpoint.getTarget()== null)
+ {
+ throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
+ };
+ }
+
+ _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
+ super.remoteDetached(endpoint, detach);
+ }
+ });
+ }
+
+ public Source getSource()
+ {
+ return _endpoint.getSource();
+ }
+
+ public Target getTarget()
+ {
+ return _endpoint.getTarget();
+ }
+
+ public void send(Message message) throws LinkDetachedException
+ {
+ send(message, null, null);
+ }
+
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+ {
+ send(message, null, action);
+ }
+
+ public void send(Message message, final Transaction txn) throws LinkDetachedException
+ {
+ send(message, txn, null);
+ }
+
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+ {
+
+ List<Section> sections = message.getPayload();
+
+ Transfer xfr = new Transfer();
+
+ if(sections != null && !sections.isEmpty())
+ {
+ SectionEncoder encoder = _session.getSectionEncoder();
+ encoder.reset();
+
+ int sectionNumber = 0;
+ for(Section section : sections)
+ {
+ encoder.encodeObject(section);
+ }
+
+
+ Binary encoding = encoder.getEncoding();
+ ByteBuffer payload = encoding.asByteBuffer();
+ xfr.setPayload(payload);
+ }
+ if(message.getDeliveryTag() == null)
+ {
+ message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
+ }
+ if(message.isResume())
+ {
+ xfr.setResume(Boolean.TRUE);
+ }
+ if(message.getDeliveryState() != null)
+ {
+ xfr.setState(message.getDeliveryState());
+ }
+
+ xfr.setDeliveryTag(message.getDeliveryTag());
+ //xfr.setSettled(_windowSize ==0);
+ if(txn != null)
+ {
+ xfr.setSettled(false);
+ TransactionalState deliveryState = new TransactionalState();
+ deliveryState.setTxnId(txn.getTxnId());
+ xfr.setState(deliveryState);
+ }
+ else
+ {
+ xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
+ }
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
+ if(action != null)
+ {
+ _outcomeActions.put(message.getDeliveryTag(), action);
+ }
+ _endpoint.transfer(xfr);
+ //TODO - rationalise sending of flows
+ // _endpoint.sendFlow();
+ }
+
+ if(_windowSize != 0)
+ {
+ synchronized(lock)
+ {
+
+
+ while(_endpoint.getUnsettledCount() >= _windowSize)
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+
+
+ }
+
+ public void close() throws SenderClosingException
+ {
+
+ if(_windowSize != 0)
+ {
+ synchronized(_endpoint.getLock())
+ {
+
+
+ while(_endpoint.getUnsettledCount() > 0)
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+ _session.removeSender(this);
+ _endpoint.setSource(null);
+ _endpoint.detach();
+ _closed = true;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderClosingException(e);
+ }
+ }
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(state instanceof Outcome)
+ {
+ OutcomeAction action;
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, (Outcome) state);
+ }
+ if(!Boolean.TRUE.equals(settled))
+ {
+ _endpoint.updateDisposition(deliveryTag, state, true);
+ }
+ }
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public Map<Binary, DeliveryState> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
+ public class SenderCreationException extends Exception
+ {
+ public SenderCreationException(Throwable e)
+ {
+ super(e);
+ }
+
+ public SenderCreationException(String e)
+ {
+ super(e);
+
+ }
+ }
+
+ public class SenderClosingException extends Exception
+ {
+ public SenderClosingException(Throwable e)
+ {
+ super(e);
+ }
+ }
+
+ public static interface OutcomeAction
+ {
+ public void onOutcome(Binary deliveryTag, Outcome outcome);
+ }
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
index 182d904a9c..79ed3b4457 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
@@ -1,384 +1,384 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
- private SessionEndpoint _endpoint;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
- private SectionEncoder _sectionEncoder;
- private SectionDecoder _sectionDecoder;
- private TransactionController _sessionLocalTC;
- private Connection _connection;
-
- public Session(final Connection connection, String name)
- {
- _connection = connection;
- _endpoint = connection.getEndpoint().createSession(name);
- _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- }
-
-
- public synchronized Sender createSender(final String targetName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, false);
- }
-
- public synchronized Sender createSender(final String targetName, boolean synchronous)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
-
- }
-
- public synchronized Sender createSender(final String targetName, int window)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- return createSender(targetName, window, mode, null);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, null);
- }
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, false, unsettled);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
- targetName, null, window, mode, isDurable, unsettled);
-
- }
-
-
- public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode, ackMode, linkName, false);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
- }
-
- public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
-
- final Target target = new Target();
- final Source source = new Source();
- source.setAddress(sourceAddr);
- source.setDistributionMode(mode);
- source.setFilter(filters);
-
- if(linkName == null)
- {
- linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
- }
-
- final Receiver receiver =
- new Receiver(this, linkName,
- target, source, ackMode, isDurable, unsettled);
- _receivers.add(receiver);
-
- return receiver;
-
- }
-
- public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.COPY);
- }
-
- public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.MOVE);
- }
-
- public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
- {
- Source source = new Source();
- source.setDynamic(true);
-
- final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
- source, AcknowledgeMode.ALO);
- _receivers.add(receiver);
- return receiver;
- }
-
- public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
- {
- Target target = new Target();
- target.setDynamic(true);
-
- final Sender sender;
- sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
- new Source(), 0, AcknowledgeMode.ALO);
- _senders.add(sender);
- return sender;
- }
-
-
-
- public SessionEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public synchronized void close()
- {
- try
- {
- for(Sender sender : new ArrayList<Sender>(_senders))
- {
- sender.close();
- }
- for(Receiver receiver : new ArrayList<Receiver>(_receivers))
- {
- receiver.detach();
- }
- if(_sessionLocalTC != null)
- {
- _sessionLocalTC.close();
- }
- _endpoint.end();
- }
- catch (Sender.SenderClosingException e)
- {
-// TODO
- e.printStackTrace();
- }
-
- //TODO
-
- }
-
- void removeSender(Sender sender)
- {
- _senders.remove(sender);
- }
-
- void removeReceiver(Receiver receiver)
- {
- _receivers.remove(receiver);
- }
-
- public SectionEncoder getSectionEncoder()
- {
- return _sectionEncoder;
- }
-
- public SectionDecoder getSectionDecoder()
- {
- return _sectionDecoder;
- }
-
-
- public Transaction createSessionLocalTransaction()
- {
- TransactionController localController = getSessionLocalTransactionController();
- return localController.beginTransaction();
- }
-
- private TransactionController getSessionLocalTransactionController()
- {
- if(_sessionLocalTC == null)
- {
- _sessionLocalTC = createSessionLocalTransactionController();
- }
- return _sessionLocalTC;
- }
-
- private TransactionController createSessionLocalTransactionController()
- {
- String name = "txnControllerLink";
- SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
- TxnCapability.MULTI_TXNS_PER_SSN);
- tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- tcLinkEndpoint.attach();
- return new TransactionController(this, tcLinkEndpoint);
- }
-
-
- public Message receive()
- {
- while(getEndpoint().getState() == SessionState.ACTIVE)
- {
- synchronized (getEndpoint().getLock())
- {
- try
- {
- for(Receiver r : _receivers)
- {
- Message m = r.receive(false);
- if(m != null)
- return m;
- }
- wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- return null;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public void awaitActive()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isEnded() && !getEndpoint().isActive())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionState;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class Session
+{
+ private SessionEndpoint _endpoint;
+ private List<Receiver> _receivers = new ArrayList<Receiver>();
+ private List<Sender> _senders = new ArrayList<Sender>();
+ private SectionEncoder _sectionEncoder;
+ private SectionDecoder _sectionDecoder;
+ private TransactionController _sessionLocalTC;
+ private Connection _connection;
+
+ public Session(final Connection connection, String name)
+ {
+ _connection = connection;
+ _endpoint = connection.getEndpoint().createSession(name);
+ _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ }
+
+
+ public synchronized Sender createSender(final String targetName)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return createSender(targetName, false);
+ }
+
+ public synchronized Sender createSender(final String targetName, boolean synchronous)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
+
+ }
+
+ public synchronized Sender createSender(final String targetName, int window)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
+
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+
+ return createSender(targetName, window, mode, null);
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return createSender(targetName, window, mode, linkName, null);
+ }
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return createSender(targetName, window, mode, linkName, false, unsettled);
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
+ targetName, null, window, mode, isDurable, unsettled);
+
+ }
+
+
+ public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
+ }
+
+ public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws ConnectionErrorException
+ {
+
+ final Target target = new Target();
+ final Source source = new Source();
+ source.setAddress(sourceAddr);
+ source.setDistributionMode(mode);
+ source.setFilter(filters);
+
+ if(linkName == null)
+ {
+ linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
+ }
+
+ final Receiver receiver =
+ new Receiver(this, linkName,
+ target, source, ackMode, isDurable, unsettled);
+ _receivers.add(receiver);
+
+ return receiver;
+
+ }
+
+ public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.COPY);
+ }
+
+ public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.MOVE);
+ }
+
+ public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
+ {
+ Source source = new Source();
+ source.setDynamic(true);
+
+ final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
+ source, AcknowledgeMode.ALO);
+ _receivers.add(receiver);
+ return receiver;
+ }
+
+ public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
+ {
+ Target target = new Target();
+ target.setDynamic(true);
+
+ final Sender sender;
+ sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
+ new Source(), 0, AcknowledgeMode.ALO);
+ _senders.add(sender);
+ return sender;
+ }
+
+
+
+ public SessionEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public synchronized void close()
+ {
+ try
+ {
+ for(Sender sender : new ArrayList<Sender>(_senders))
+ {
+ sender.close();
+ }
+ for(Receiver receiver : new ArrayList<Receiver>(_receivers))
+ {
+ receiver.detach();
+ }
+ if(_sessionLocalTC != null)
+ {
+ _sessionLocalTC.close();
+ }
+ _endpoint.end();
+ }
+ catch (Sender.SenderClosingException e)
+ {
+// TODO
+ e.printStackTrace();
+ }
+
+ //TODO
+
+ }
+
+ void removeSender(Sender sender)
+ {
+ _senders.remove(sender);
+ }
+
+ void removeReceiver(Receiver receiver)
+ {
+ _receivers.remove(receiver);
+ }
+
+ public SectionEncoder getSectionEncoder()
+ {
+ return _sectionEncoder;
+ }
+
+ public SectionDecoder getSectionDecoder()
+ {
+ return _sectionDecoder;
+ }
+
+
+ public Transaction createSessionLocalTransaction()
+ {
+ TransactionController localController = getSessionLocalTransactionController();
+ return localController.beginTransaction();
+ }
+
+ private TransactionController getSessionLocalTransactionController()
+ {
+ if(_sessionLocalTC == null)
+ {
+ _sessionLocalTC = createSessionLocalTransactionController();
+ }
+ return _sessionLocalTC;
+ }
+
+ private TransactionController createSessionLocalTransactionController()
+ {
+ String name = "txnControllerLink";
+ SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
+ TxnCapability.MULTI_TXNS_PER_SSN);
+ tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ tcLinkEndpoint.attach();
+ return new TransactionController(this, tcLinkEndpoint);
+ }
+
+
+ public Message receive()
+ {
+ while(getEndpoint().getState() == SessionState.ACTIVE)
+ {
+ synchronized (getEndpoint().getLock())
+ {
+ try
+ {
+ for(Receiver r : _receivers)
+ {
+ Message m = r.receive(false);
+ if(m != null)
+ return m;
+ }
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ return null;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ public void awaitActive()
+ {
+ synchronized(getEndpoint().getLock())
+ {
+ while(!getEndpoint().isEnded() && !getEndpoint().isActive())
+ {
+ try
+ {
+ getEndpoint().getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+}