diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-09-25 15:15:18 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-09-25 15:15:18 +0000 |
| commit | 3bdd4780e35b4454477cf423b31ad6915df357fa (patch) | |
| tree | 2e29ad7a4be731a503869d80d4c8a63e203bd2af /qpid/java | |
| parent | 52c2bdf4d12655bf341151ed957bea5656b9723d (diff) | |
| download | qpid-python-3bdd4780e35b4454477cf423b31ad6915df357fa.tar.gz | |
NO-JIRA : set svn:eol-style to native
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1526202 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 2913 insertions, 2913 deletions
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index 5e77b7097c..26b0d80783 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -1,431 +1,431 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-public class Demo extends Util
-{
- private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
- private static final String OPCODE = "opcode";
- private static final String ACTION = "action";
- private static final String MESSAGE_ID = "message-id";
- private static final String VENDOR = "vendor";
- private static final String LOG = "log";
- private static final String RECEIVED = "received";
- private static final String TEST = "test";
- private static final String APACHE = "apache";
- private static final String SENT = "sent";
- private static final String LINK_REF = "link-ref";
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String SASL_USER = "sasl-user";
- private static final String SASL_PASSWORD = "sasl-password";
- private static final String ROLE = "role";
- private static final String ADDRESS = "address";
- private static final String SENDER = "sender";
- private static final String SEND_MESSAGE = "send-message";
- private static final String ANNOUNCE = "announce";
- private static final String MESSAGE_VENDOR = "message-vendor";
- private static final String CREATE_LINK = "create-link";
-
- public static void main(String[] args)
- {
- new Demo(args).run();
- }
-
- public Demo(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- public void run()
- {
-
- try
- {
-
- final String vendor = getArgs()[0];
- final String queue = "control";
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
-
- Receiver responseReceiver;
-
- responseReceiver = session.createTemporaryQueueReceiver();
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
-
- Properties properties = new Properties();
- properties.setMessageId(java.util.UUID.randomUUID());
- properties.setReplyTo(responseReceiver.getAddress());
-
- HashMap appPropMap = new HashMap();
- ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
-
- appPropMap.put(OPCODE, ANNOUNCE);
- appPropMap.put(VENDOR, vendor);
- appPropMap.put(ADDRESS,responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { properties, appProperties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- try
- {
- s.send(message1);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
- Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
-
-
- boolean done = false;
-
- while(!done)
- {
- boolean wait = true;
- Message m = responseReceiver.receive(false);
- if(m != null)
- {
- List<Section> payload = m.getPayload();
- wait = false;
- ApplicationProperties props = m.getApplicationProperties();
- Map map = props.getValue();
- String op = (String) map.get(OPCODE);
- if("reset".equals(op))
- {
- for(Sender sender : sendingLinks.values())
- {
- try
- {
- sender.close();
- Session session1 = sender.getSession();
- session1.close();
- session1.getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- for(Receiver receiver : receivingLinks.values())
- {
- try
- {
- receiver.close();
- receiver.getSession().close();
- receiver.getSession().getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- sendingLinks.clear();
- receivingLinks.clear();
- }
- else if(CREATE_LINK.equals(op))
- {
- Object linkRef = map.get(LINK_REF);
- String host = (String) map.get(HOST);
- Object o = map.get(PORT);
- int port = Integer.parseInt(String.valueOf(o));
- String user = (String) map.get(SASL_USER);
- String password = (String) map.get(SASL_PASSWORD);
- String role = (String) map.get(ROLE);
- String address = (String) map.get(ADDRESS);
- System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
- try{
-
-
- Connection conn2 = new Connection(host, port, user, password, host);
- Session session2 = conn2.createSession();
- if(sendingLinks.containsKey(linkRef))
- {
- try
- {
- sendingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(receivingLinks.containsKey(linkRef))
- {
- try
- {
- receivingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(SENDER.equals(role))
- {
-
- System.err.println("%%% Creating sender (" + linkRef + ")");
- Sender sender = session2.createSender(address);
- sendingLinks.put(linkRef, sender);
- }
- else
- {
-
- System.err.println("%%% Creating receiver (" + linkRef + ")");
- Receiver receiver2 = session2.createReceiver(address);
- receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
- receivingLinks.put(linkRef, receiver2);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- else if(SEND_MESSAGE.equals(op))
- {
- Sender sender = sendingLinks.get(map.get(LINK_REF));
- Properties m2props = new Properties();
- Object messageId = map.get(MESSAGE_ID);
- m2props.setMessageId(messageId);
- Map m2propmap = new HashMap();
- m2propmap.put(OPCODE, TEST);
- m2propmap.put(VENDOR, vendor);
- ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
- Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- try
- {
- sender.send(m2);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, SENT);
- m3propmap.put(MESSAGE_ID, messageId);
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, vendor);
-
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+messageId)));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- responseReceiver.acknowledge(m);
- }
- else
- {
- for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
- {
- m = entry.getValue().receive(false);
- if(m != null)
- {
- wait = false;
-
- System.err.println("%%% Received message from " + entry.getKey());
-
- Properties mp = m.getProperties();
- ApplicationProperties ap = m.getApplicationProperties();
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, RECEIVED);
- m3propmap.put(MESSAGE_ID, mp.getMessageId());
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+mp.getMessageId())));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- entry.getValue().acknowledge(m);
- }
-
- }
- }
-
- if(wait)
- {
- try
- {
- Thread.sleep(500l);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- }
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +public class Demo extends Util +{ + private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:"; + private static final String OPCODE = "opcode"; + private static final String ACTION = "action"; + private static final String MESSAGE_ID = "message-id"; + private static final String VENDOR = "vendor"; + private static final String LOG = "log"; + private static final String RECEIVED = "received"; + private static final String TEST = "test"; + private static final String APACHE = "apache"; + private static final String SENT = "sent"; + private static final String LINK_REF = "link-ref"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String SASL_USER = "sasl-user"; + private static final String SASL_PASSWORD = "sasl-password"; + private static final String ROLE = "role"; + private static final String ADDRESS = "address"; + private static final String SENDER = "sender"; + private static final String SEND_MESSAGE = "send-message"; + private static final String ANNOUNCE = "announce"; + private static final String MESSAGE_VENDOR = "message-vendor"; + private static final String CREATE_LINK = "create-link"; + + public static void main(String[] args) + { + new Demo(args).run(); + } + + public Demo(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected boolean hasWindowSizeOption() + { + return false; + } + + public void run() + { + + try + { + + final String vendor = getArgs()[0]; + final String queue = "control"; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + + Receiver responseReceiver; + + responseReceiver = session.createTemporaryQueueReceiver(); + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + + Properties properties = new Properties(); + properties.setMessageId(java.util.UUID.randomUUID()); + properties.setReplyTo(responseReceiver.getAddress()); + + HashMap appPropMap = new HashMap(); + ApplicationProperties appProperties = new ApplicationProperties(appPropMap); + + appPropMap.put(OPCODE, ANNOUNCE); + appPropMap.put(VENDOR, vendor); + appPropMap.put(ADDRESS,responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { properties, appProperties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + try + { + s.send(message1); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>(); + Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>(); + + + boolean done = false; + + while(!done) + { + boolean wait = true; + Message m = responseReceiver.receive(false); + if(m != null) + { + List<Section> payload = m.getPayload(); + wait = false; + ApplicationProperties props = m.getApplicationProperties(); + Map map = props.getValue(); + String op = (String) map.get(OPCODE); + if("reset".equals(op)) + { + for(Sender sender : sendingLinks.values()) + { + try + { + sender.close(); + Session session1 = sender.getSession(); + session1.close(); + session1.getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + for(Receiver receiver : receivingLinks.values()) + { + try + { + receiver.close(); + receiver.getSession().close(); + receiver.getSession().getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + sendingLinks.clear(); + receivingLinks.clear(); + } + else if(CREATE_LINK.equals(op)) + { + Object linkRef = map.get(LINK_REF); + String host = (String) map.get(HOST); + Object o = map.get(PORT); + int port = Integer.parseInt(String.valueOf(o)); + String user = (String) map.get(SASL_USER); + String password = (String) map.get(SASL_PASSWORD); + String role = (String) map.get(ROLE); + String address = (String) map.get(ADDRESS); + System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); + try{ + + + Connection conn2 = new Connection(host, port, user, password, host); + Session session2 = conn2.createSession(); + if(sendingLinks.containsKey(linkRef)) + { + try + { + sendingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(receivingLinks.containsKey(linkRef)) + { + try + { + receivingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(SENDER.equals(role)) + { + + System.err.println("%%% Creating sender (" + linkRef + ")"); + Sender sender = session2.createSender(address); + sendingLinks.put(linkRef, sender); + } + else + { + + System.err.println("%%% Creating receiver (" + linkRef + ")"); + Receiver receiver2 = session2.createReceiver(address); + receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + receivingLinks.put(linkRef, receiver2); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + else if(SEND_MESSAGE.equals(op)) + { + Sender sender = sendingLinks.get(map.get(LINK_REF)); + Properties m2props = new Properties(); + Object messageId = map.get(MESSAGE_ID); + m2props.setMessageId(messageId); + Map m2propmap = new HashMap(); + m2propmap.put(OPCODE, TEST); + m2propmap.put(VENDOR, vendor); + ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); + Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); + try + { + sender.send(m2); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, SENT); + m3propmap.put(MESSAGE_ID, messageId); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, vendor); + + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+messageId))); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + responseReceiver.acknowledge(m); + } + else + { + for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet()) + { + m = entry.getValue().receive(false); + if(m != null) + { + wait = false; + + System.err.println("%%% Received message from " + entry.getKey()); + + Properties mp = m.getProperties(); + ApplicationProperties ap = m.getApplicationProperties(); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, RECEIVED); + m3propmap.put(MESSAGE_ID, mp.getMessageId()); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+mp.getMessageId()))); + try + { + s.send(m3); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + entry.getValue().acknowledge(m); + } + + } + } + + if(wait) + { + try + { + Thread.sleep(500l); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } + + } + + + + + + + + + + s.close(); + session.close(); + conn.close(); + + } + catch (ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index dbe273182f..7845e318cb 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -1,236 +1,236 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.Arrays;
-
-public class Request extends Util
-{
- private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
-
- public static void main(String[] args)
- {
- new Request(args).run();
- }
-
- public Request(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public void run()
- {
-
- try
- {
-
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
- Connection conn2;
- Session session2;
- Receiver responseReceiver;
-
- if(isUseMultipleConnections())
- {
- conn2 = newConnection();
- session2 = conn2.createSession();
- responseReceiver = session2.createTemporaryQueueReceiver();
- }
- else
- {
- conn2 = null;
- session2 = null;
- responseReceiver = session.createTemporaryQueueReceiver();
- }
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- int received = 0;
-
- if(getArgs().length >= 2)
- {
- message = getArgs()[1];
- if(message.length() < getMessageSize())
- {
- StringBuilder builder = new StringBuilder(getMessageSize());
- builder.append(message);
- for(int x = message.length(); x < getMessageSize(); x++)
- {
- builder.append('.');
- }
- message = builder.toString();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- properties.setReplyTo(responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { new Header() , properties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
-
- Message responseMessage = responseReceiver.receive(false);
- if(responseMessage != null)
- {
- responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
- received++;
- }
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
-
- while(received < getCount())
- {
- Message responseMessage = responseReceiver.receive();
- responseReceiver.acknowledge(responseMessage.getDeliveryTag());
- received++;
- }
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- if(session2 != null)
- {
- session2.close();
- conn2.close();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.Arrays; + +public class Request extends Util +{ + private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:"; + + public static void main(String[] args) + { + new Request(args).run(); + } + + public Request(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public void run() + { + + try + { + + + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + Connection conn2; + Session session2; + Receiver responseReceiver; + + if(isUseMultipleConnections()) + { + conn2 = newConnection(); + session2 = conn2.createSession(); + responseReceiver = session2.createTemporaryQueueReceiver(); + } + else + { + conn2 = null; + session2 = null; + responseReceiver = session.createTemporaryQueueReceiver(); + } + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + int received = 0; + + if(getArgs().length >= 2) + { + message = getArgs()[1]; + if(message.length() < getMessageSize()) + { + StringBuilder builder = new StringBuilder(getMessageSize()); + builder.append(message); + for(int x = message.length(); x < getMessageSize(); x++) + { + builder.append('.'); + } + message = builder.toString(); + } + + for(int i = 0; i < getCount(); i++) + { + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + properties.setReplyTo(responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { new Header() , properties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + + Message responseMessage = responseReceiver.receive(false); + if(responseMessage != null) + { + responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); + received++; + } + } + } + + if(txn != null) + { + txn.commit(); + } + + + while(received < getCount()) + { + Message responseMessage = responseReceiver.receive(); + responseReceiver.acknowledge(responseMessage.getDeliveryTag()); + received++; + } + + + + + s.close(); + session.close(); + conn.close(); + + if(session2 != null) + { + session2.close(); + conn2.close(); + } + } + catch (Exception e) + { + e.printStackTrace(); //TODO. + } + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java index b4ae16ab3f..36aadc7851 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -1,235 +1,235 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.util.Arrays;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-public class Send extends Util
-{
- private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
- private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-
- public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException
- {
- new Send(args).run();
- }
-
-
- public Send(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSubjectOption()
- {
- return true;
- }
-
- public void run()
- {
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
- if(getArgs().length == 2)
- {
- message = getArgs()[1];
- }
- for(int i = 0; i < getCount(); i++)
- {
-
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- if(getSubject() != null)
- {
- properties.setSubject(getSubject());
- }
- Section bodySection;
- byte[] bytes = (message + " " + i).getBytes();
- if(bytes.length < getMessageSize())
- {
- byte[] origBytes = bytes;
- bytes = new byte[getMessageSize()];
- System.arraycopy(origBytes,0,bytes,0,origBytes.length);
- for(int x = origBytes.length; x < bytes.length; x++)
- {
- bytes[x] = (byte) '.';
- }
- bodySection = new Data(new Binary(bytes));
- }
- else
- {
- bodySection = new AmqpValue(message + " " + i);
- }
-
- Section[] sections = {properties, bodySection};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
- }
- }
- else
- {
- for(int i = 1; i < getArgs().length; i++)
- {
- s.send(new Message(getArgs()[i]), txn);
- }
-
- }
- }
- else
- {
- LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
-
-
- try
- {
- while((message = buf.readLine()) != null)
- {
- s.send(new Message(message), txn);
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
- s.close();
-
- session.close();
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.util.Arrays; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +public class Send extends Util +{ + private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:"; + private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; + + + public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException + { + new Send(args).run(); + } + + + public Send(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasSubjectOption() + { + return true; + } + + public void run() + { + + final String queue = getArgs()[0]; + + String message = ""; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + if(getArgs().length == 2) + { + message = getArgs()[1]; + } + for(int i = 0; i < getCount(); i++) + { + + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + if(getSubject() != null) + { + properties.setSubject(getSubject()); + } + Section bodySection; + byte[] bytes = (message + " " + i).getBytes(); + if(bytes.length < getMessageSize()) + { + byte[] origBytes = bytes; + bytes = new byte[getMessageSize()]; + System.arraycopy(origBytes,0,bytes,0,origBytes.length); + for(int x = origBytes.length; x < bytes.length; x++) + { + bytes[x] = (byte) '.'; + } + bodySection = new Data(new Binary(bytes)); + } + else + { + bodySection = new AmqpValue(message + " " + i); + } + + Section[] sections = {properties, bodySection}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + } + } + else + { + for(int i = 1; i < getArgs().length; i++) + { + s.send(new Message(getArgs()[i]), txn); + } + + } + } + else + { + LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); + + + try + { + while((message = buf.readLine()) != null) + { + s.send(new Message(message), txn); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + } + + if(txn != null) + { + txn.commit(); + } + + s.close(); + + session.close(); + conn.close(); + } + catch (Exception e) + { + e.printStackTrace(); //TODO. + } + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 4c5ffeb177..f66a33b978 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -1,412 +1,412 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.net.ssl.SSLSocketFactory;
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-
-public class Connection
-{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
- private static final int MAX_FRAME_SIZE = 65536;
-
- private String _address;
- private ConnectionEndpoint _conn;
- private int _sessionCount;
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password, String remoteHostname) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,new Container());
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,MAX_FRAME_SIZE,container);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container, null);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
- }
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname, boolean ssl) throws ConnectionException
- {
-
- _address = address;
-
- try
- {
- final Socket s;
- if(ssl)
- {
- s = SSLSocketFactory.getDefault().createSocket(address, port);
- }
- else
- {
- s = new Socket(address, port);
- }
-
-
- Principal principal = username == null ? null : new Principal()
- {
-
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteAddress(s.getRemoteSocketAddress());
- _conn.setRemoteHostname(remoteHostname);
-
-
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
-
- final OutputStream outputStream = s.getOutputStream();
-
- ConnectionHandler.BytesSource src;
-
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
-
- _conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
- }
-
-
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
- Thread outputThread = new Thread(outputHandler);
- outputThread.setDaemon(true);
- outputThread.start();
- _conn.setFrameOutputHandler(out);
-
-
-
- final ConnectionHandler handler = new ConnectionHandler(_conn);
- final InputStream inputStream = s.getInputStream();
-
- Thread inputThread = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- doRead(handler, inputStream);
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- s.close();
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
-
- inputThread.setDaemon(true);
- inputThread.start();
-
- _conn.open();
-
- }
- catch (IOException e)
- {
- throw new ConnectionException(e);
- }
-
-
- }
-
- private Connection(ConnectionEndpoint endpoint)
- {
- _conn = endpoint;
- }
-
-
- private void doRead(final AMQPTransport transport, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
- ByteBuffer bbuf = ByteBuffer.wrap(buf);
- final Object lock = new Object();
- transport.setInputStateChangeListener(new StateChangeListener(){
-
- public void onStateChange(final boolean active)
- {
- synchronized(lock)
- {
- lock.notifyAll();
- }
- }
- });
-
- try
- {
- int read;
- while((read = inputStream.read(buf)) != -1)
- {
- bbuf.position(0);
- bbuf.limit(read);
-
- while(bbuf.hasRemaining() && transport.isOpenForInput())
- {
- transport.processBytes(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
-
- }
-
- public Session createSession() throws ConnectionException
- {
- checkNotClosed();
- Session session = new Session(this,String.valueOf(_sessionCount++));
- return session;
- }
-
- void checkNotClosed() throws ConnectionClosedException
- {
- if(getEndpoint().isClosed())
- {
- throw new ConnectionClosedException(getEndpoint().getRemoteError());
- }
- }
-
- public ConnectionEndpoint getEndpoint()
- {
- return _conn;
- }
-
- public void awaitOpen()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
- private void doRead(final ConnectionHandler handler, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
-
-
- try
- {
- int read;
- boolean done = false;
- while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
- {
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- Binary b = new Binary(buf,0,read);
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
- }
- while(bbuf.hasRemaining() && !handler.isDone())
- {
- handler.parse(bbuf);
- }
-
-
- }
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public void close()
- {
- _conn.close();
-
- synchronized (_conn.getLock())
- {
- while(!_conn.closedForInput())
- {
- try
- {
- _conn.getLock().wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.net.ssl.SSLSocketFactory; +import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; +import org.apache.qpid.amqp_1_0.transport.AMQPTransport; +import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.StateChangeListener; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.FrameBody; +import org.apache.qpid.amqp_1_0.type.SaslFrameBody; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; + +public class Connection +{ + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + private static final int MAX_FRAME_SIZE = 65536; + + private String _address; + private ConnectionEndpoint _conn; + private int _sessionCount; + + + public Connection(final String address, + final int port, + final String username, + final String password) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE); + } + + public Connection(final String address, + final int port, + final String username, + final String password, String remoteHostname) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,new Container()); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container) throws ConnectionException + { + this(address,port,username,password,MAX_FRAME_SIZE,container); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,container, null); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname) throws ConnectionException + { + this(address,port,username,password,maxFrameSize,container,remoteHostname,false); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final String remoteHost, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl); + } + + public Connection(final String address, + final int port, + final String username, + final String password, + final Container container, + final String remoteHost, + final boolean ssl) throws ConnectionException + { + this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl); + } + + + public Connection(final String address, + final int port, + final String username, + final String password, + final int maxFrameSize, + final Container container, + final String remoteHostname, boolean ssl) throws ConnectionException + { + + _address = address; + + try + { + final Socket s; + if(ssl) + { + s = SSLSocketFactory.getDefault().createSocket(address, port); + } + else + { + s = new Socket(address, port); + } + + + Principal principal = username == null ? null : new Principal() + { + + public String getName() + { + return username; + } + }; + _conn = new ConnectionEndpoint(container, principal, password); + _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); + _conn.setRemoteAddress(s.getRemoteSocketAddress()); + _conn.setRemoteHostname(remoteHostname); + + + + ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); + + + final OutputStream outputStream = s.getOutputStream(); + + ConnectionHandler.BytesSource src; + + if(_conn.requiresSASL()) + { + ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); + + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)3, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()), + new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + + _conn.setSaslFrameOutput(saslOut); + } + else + { + src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', + (byte)'M', + (byte)'Q', + (byte)'P', + (byte)0, + (byte)1, + (byte)0, + (byte)0), + new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry()) + ); + } + + + ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); + Thread outputThread = new Thread(outputHandler); + outputThread.setDaemon(true); + outputThread.start(); + _conn.setFrameOutputHandler(out); + + + + final ConnectionHandler handler = new ConnectionHandler(_conn); + final InputStream inputStream = s.getInputStream(); + + Thread inputThread = new Thread(new Runnable() + { + + public void run() + { + try + { + doRead(handler, inputStream); + } + finally + { + if(_conn.closedForInput() && _conn.closedForOutput()) + { + try + { + s.close(); + } + catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } + }); + + inputThread.setDaemon(true); + inputThread.start(); + + _conn.open(); + + } + catch (IOException e) + { + throw new ConnectionException(e); + } + + + } + + private Connection(ConnectionEndpoint endpoint) + { + _conn = endpoint; + } + + + private void doRead(final AMQPTransport transport, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + ByteBuffer bbuf = ByteBuffer.wrap(buf); + final Object lock = new Object(); + transport.setInputStateChangeListener(new StateChangeListener(){ + + public void onStateChange(final boolean active) + { + synchronized(lock) + { + lock.notifyAll(); + } + } + }); + + try + { + int read; + while((read = inputStream.read(buf)) != -1) + { + bbuf.position(0); + bbuf.limit(read); + + while(bbuf.hasRemaining() && transport.isOpenForInput()) + { + transport.processBytes(bbuf); + } + + + } + } + catch (IOException e) + { + e.printStackTrace(); + } + + } + + public Session createSession() throws ConnectionException + { + checkNotClosed(); + Session session = new Session(this,String.valueOf(_sessionCount++)); + return session; + } + + void checkNotClosed() throws ConnectionClosedException + { + if(getEndpoint().isClosed()) + { + throw new ConnectionClosedException(getEndpoint().getRemoteError()); + } + } + + public ConnectionEndpoint getEndpoint() + { + return _conn; + } + + public void awaitOpen() + { + synchronized(getEndpoint().getLock()) + { + while(!getEndpoint().isOpen() && !getEndpoint().isClosed()) + { + try + { + getEndpoint().getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + + private void doRead(final ConnectionHandler handler, final InputStream inputStream) + { + byte[] buf = new byte[2<<15]; + + + try + { + int read; + boolean done = false; + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) + { + ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); + Binary b = new Binary(buf,0,read); + + if(RAW_LOGGER.isLoggable(Level.FINE)) + { + RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); + } + while(bbuf.hasRemaining() && !handler.isDone()) + { + handler.parse(bbuf); + } + + + } + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + public void close() + { + _conn.close(); + + synchronized (_conn.getLock()) + { + while(!_conn.closedForInput()) + { + try + { + _conn.getLock().wait(); + } + catch (InterruptedException e) + { + + } + } + } + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java index 7c1172898b..e8ac1de6c1 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java @@ -1,148 +1,148 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-public class Message
-{
- private Binary _deliveryTag;
- private List<Section> _payload = new ArrayList<Section>();
- private Boolean _resume;
- private boolean _settled;
- private DeliveryState _deliveryState;
- private Receiver _receiver;
-
-
- public Message()
- {
- }
-
- public Message(Collection<Section> sections)
- {
- _payload.addAll(sections);
- }
-
- public Message(Section section)
- {
- this(Collections.singletonList(section));
- }
-
- public Message(String message)
- {
- this(new AmqpValue(message));
- }
-
-
- public Binary getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- public void setDeliveryTag(Binary deliveryTag)
- {
- _deliveryTag = deliveryTag;
- }
-
- public List<Section> getPayload()
- {
- return Collections.unmodifiableList(_payload);
- }
-
- private <T extends Section> T getSection(Class<T> clazz)
- {
- for(Section s : _payload)
- {
- if(clazz.isAssignableFrom(s.getClass()))
- {
- return (T) s;
- }
- }
- return null;
- }
-
- public ApplicationProperties getApplicationProperties()
- {
- return getSection(ApplicationProperties.class);
- }
-
- public Properties getProperties()
- {
- return getSection(Properties.class);
- }
-
- public Header getHeader()
- {
- return getSection(Header.class);
- }
-
-
- public void setResume(final Boolean resume)
- {
- _resume = resume;
- }
-
- public boolean isResume()
- {
- return Boolean.TRUE.equals(_resume);
- }
-
- public void setDeliveryState(DeliveryState state)
- {
- _deliveryState = state;
- }
-
- public DeliveryState getDeliveryState()
- {
- return _deliveryState;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
-
- public boolean getSettled()
- {
- return _settled;
- }
-
- public void setReceiver(final Receiver receiver)
- {
- _receiver = receiver;
- }
-
- public Receiver getReceiver()
- {
- return _receiver;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class Message +{ + private Binary _deliveryTag; + private List<Section> _payload = new ArrayList<Section>(); + private Boolean _resume; + private boolean _settled; + private DeliveryState _deliveryState; + private Receiver _receiver; + + + public Message() + { + } + + public Message(Collection<Section> sections) + { + _payload.addAll(sections); + } + + public Message(Section section) + { + this(Collections.singletonList(section)); + } + + public Message(String message) + { + this(new AmqpValue(message)); + } + + + public Binary getDeliveryTag() + { + return _deliveryTag; + } + + public void setDeliveryTag(Binary deliveryTag) + { + _deliveryTag = deliveryTag; + } + + public List<Section> getPayload() + { + return Collections.unmodifiableList(_payload); + } + + private <T extends Section> T getSection(Class<T> clazz) + { + for(Section s : _payload) + { + if(clazz.isAssignableFrom(s.getClass())) + { + return (T) s; + } + } + return null; + } + + public ApplicationProperties getApplicationProperties() + { + return getSection(ApplicationProperties.class); + } + + public Properties getProperties() + { + return getSection(Properties.class); + } + + public Header getHeader() + { + return getSection(Header.class); + } + + + public void setResume(final Boolean resume) + { + _resume = resume; + } + + public boolean isResume() + { + return Boolean.TRUE.equals(_resume); + } + + public void setDeliveryState(DeliveryState state) + { + _deliveryState = state; + } + + public DeliveryState getDeliveryState() + { + return _deliveryState; + } + + public void setSettled(boolean settled) + { + _settled = settled; + } + + public boolean getSettled() + { + return _settled; + } + + public void setReceiver(final Receiver receiver) + { + _receiver = receiver; + } + + public Receiver getReceiver() + { + return _receiver; + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 596931088f..5175d1d847 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -1,615 +1,615 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class Receiver implements DeliveryStateHandler
-{
- private ReceivingLinkEndpoint _endpoint;
- private int _id;
- private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
- private Session _session;
-
- private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
- private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
- private MessageArrivalListener _messageArrivalListener;
- private org.apache.qpid.amqp_1_0.type.transport.Error _error;
- private Runnable _remoteErrorTask;
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode) throws ConnectionErrorException
- {
- this(session, linkName, target, source, ackMode, false);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- boolean isDurable) throws ConnectionErrorException
- {
- this(session,linkName,target,source,ackMode,isDurable,null);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- final boolean isDurable,
- final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
- {
-
- session.getConnection().checkNotClosed();
- _session = session;
- if(isDurable)
- {
- source.setDurable(TerminusDurability.UNSETTLED_STATE);
- source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- else if(source != null)
- {
- source.setDurable(TerminusDurability.NONE);
- source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- }
- _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
- UnsignedInteger.ZERO);
-
- _endpoint.setDeliveryStateHandler(this);
-
- switch(ackMode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
- {
- @Override public void messageTransfer(final Transfer xfr)
- {
- _prefetchQueue.add(xfr);
- postPrefetchAction();
- }
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(detach.getError()!=null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
-
- _endpoint.setLocalUnsettled(unsettled);
- _endpoint.attach();
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isAttached() && !_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- if(_endpoint.getSource() == null)
- {
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- throw new ConnectionErrorException(getError());
- }
- else
- {
-
- }
- }
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
- private void postPrefetchAction()
- {
- if(_messageArrivalListener != null)
- {
- _messageArrivalListener.messageArrived(this);
- }
- }
-
- public void setCredit(UnsignedInteger credit, boolean window)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setCreditWindow(window);
-
- }
-
-
- public String getAddress()
- {
- return ((Source)_endpoint.getSource()).getAddress();
- }
-
- public Map getFilter()
- {
- return ((Source)_endpoint.getSource()).getFilter();
- }
-
- public Message receive()
- {
- return receive(-1L);
- }
-
- public Message receive(boolean wait)
- {
- return receive(wait ? -1L : 0L);
- }
-
- // 0 means no wait, -1 wait forever
- public Message receive(long wait)
- {
- Message m = null;
- Transfer xfr;
- long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
-
- while((xfr = receiveFromPrefetch(wait)) != null )
- {
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- Binary deliveryTag = xfr.getDeliveryTag();
- Boolean resume = xfr.getResume();
-
- List<Section> sections = new ArrayList<Section>();
- List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
- int totalSize = 0;
-
- boolean hasMore;
- do
- {
- hasMore = Boolean.TRUE.equals(xfr.getMore());
-
- ByteBuffer buf = xfr.getPayload();
-
- if(buf != null)
- {
-
- totalSize += buf.remaining();
-
- payloads.add(buf);
- }
- if(hasMore)
- {
- xfr = receiveFromPrefetch(-1l);
- if(xfr== null)
- {
- // TODO - this is wrong!!!!
- System.out.println("eeek");
- }
- }
- }
- while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
- for(ByteBuffer payload : payloads)
- {
- allPayload.put(payload);
- }
- allPayload.flip();
- SectionDecoder decoder = _session.getSectionDecoder();
-
- try
- {
- sections = decoder.parseAll(allPayload);
- }
- catch (AmqpErrorException e)
- {
- // todo - throw a sensible error
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- m = new Message(sections);
- m.setDeliveryTag(deliveryTag);
- m.setResume(resume);
- m.setReceiver(this);
- break;
- }
- }
-
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <=0L)
- {
- break;
- }
- }
- }
-
-
- return m;
-
- }
-
- private Transfer receiveFromPrefetch(long wait)
- {
- long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- Transfer xfr;
- while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
- && wait != 0)
- {
- try
- {
- if(wait>0L)
- {
- lock.wait(wait);
- }
- else if(wait<0L)
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- return null;
- }
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <= 0L)
- {
- break;
- }
- }
-
- }
- if(xfr != null)
- {
- _prefetchQueue.poll();
-
- }
-
- return xfr;
- }
-
- }
-
-
- public void release(final Message m)
- {
- release(m.getDeliveryTag());
- }
-
- public void release(Binary deliveryTag)
- {
- update(new Released(), deliveryTag, null, null);
- }
-
-
- public void modified(Binary tag)
- {
- final Modified outcome = new Modified();
- outcome.setDeliveryFailed(true);
-
- update(outcome, tag, null, null);
- }
-
- public void acknowledge(final Message m)
- {
- acknowledge(m.getDeliveryTag());
- }
-
- public void acknowledge(final Message m, SettledAction a)
- {
- acknowledge(m.getDeliveryTag(), a);
- }
-
-
- public void acknowledge(final Message m, Transaction txn)
- {
- acknowledge(m.getDeliveryTag(), txn);
- }
-
-
- public void acknowledge(final Binary deliveryTag)
- {
- acknowledge(deliveryTag, null, null);
- }
-
-
- public void acknowledge(final Binary deliveryTag, SettledAction a)
- {
- acknowledge(deliveryTag, null, a);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn)
- {
- acknowledge(deliveryTag, txn, null);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- update(new Accepted(), deliveryTag, txn, action);
- }
-
- public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
-
- DeliveryState state;
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
-
- _endpoint.updateDisposition(deliveryTag,state, settled);
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public void acknowledgeAll(Message m)
- {
- acknowledgeAll(m.getDeliveryTag());
- }
-
- public void acknowledgeAll(Binary deliveryTag)
- {
- acknowledgeAll(deliveryTag, null, null);
- }
-
- public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- updateAll(new Accepted(), deliveryTag, txn, action);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag)
- {
- updateAll(outcome, deliveryTag, null, null);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- DeliveryState state;
-
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
- _endpoint.updateAllDisposition(deliveryTag, state, settled);
- }
-
-
-
- public void close()
- {
- _endpoint.setTarget(null);
- _endpoint.close();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
-
- public void detach()
- {
- _endpoint.setTarget(null);
- _endpoint.detach();
- Message msg;
- while((msg = receive(-1l)) != null)
- {
- release(msg);
- }
-
- }
-
- public void drain()
- {
- _endpoint.drain();
- }
-
- /**
- * Waits for the receiver to drain or a message to be available to be received.
- * @return true if the receiver has been drained.
- */
- public boolean drainWait()
- {
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- try
- {
- while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- }
- }
- return _prefetchQueue.peek()==null && _endpoint.isDrained();
- }
-
- /**
- * Clears the receiver drain so that message delivery can resume.
- */
- public void clearDrain()
- {
- _endpoint.clearDrain();
- }
-
- public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
- _endpoint.setCreditWindow(false);
-
- }
-
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- SettledAction action = _unsettledMap.remove(deliveryTag);
- if(action != null)
- {
- action.onSettled(deliveryTag);
- }
- }
- }
-
- public Map<Binary, Outcome> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
-
- public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
- {
- synchronized(_endpoint.getLock())
- {
- _messageArrivalListener = messageArrivalListener;
- int prefetchSize = _prefetchQueue.size();
- for(int i = 0; i < prefetchSize; i++)
- {
- postPrefetchAction();
- }
- }
- }
-
- public Session getSession()
- {
- return _session;
- }
-
- public org.apache.qpid.amqp_1_0.type.Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public static interface SettledAction
- {
- public void onSettled(Binary deliveryTag);
- }
-
-
- public interface MessageArrivalListener
- {
- void messageArrived(Receiver receiver);
- }
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class Receiver implements DeliveryStateHandler +{ + private ReceivingLinkEndpoint _endpoint; + private int _id; + private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100); + private Session _session; + + private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>(); + private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>(); + private MessageArrivalListener _messageArrivalListener; + private org.apache.qpid.amqp_1_0.type.transport.Error _error; + private Runnable _remoteErrorTask; + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode) throws ConnectionErrorException + { + this(session, linkName, target, source, ackMode, false); + } + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode, + boolean isDurable) throws ConnectionErrorException + { + this(session,linkName,target,source,ackMode,isDurable,null); + } + + public Receiver(final Session session, + final String linkName, + final Target target, + final Source source, + final AcknowledgeMode ackMode, + final boolean isDurable, + final Map<Binary,Outcome> unsettled) throws ConnectionErrorException + { + + session.getConnection().checkNotClosed(); + _session = session; + if(isDurable) + { + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + } + else if(source != null) + { + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + } + _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source, + UnsignedInteger.ZERO); + + _endpoint.setDeliveryStateHandler(this); + + switch(ackMode) + { + case ALO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case EO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + + _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener() + { + @Override public void messageTransfer(final Transfer xfr) + { + _prefetchQueue.add(xfr); + postPrefetchAction(); + } + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(detach.getError()!=null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); + + _endpoint.setLocalUnsettled(unsettled); + _endpoint.attach(); + + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isAttached() && !_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + if(_endpoint.getSource() == null) + { + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + throw new ConnectionErrorException(getError()); + } + else + { + + } + } + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + private void postPrefetchAction() + { + if(_messageArrivalListener != null) + { + _messageArrivalListener.messageArrived(this); + } + } + + public void setCredit(UnsignedInteger credit, boolean window) + { + _endpoint.setLinkCredit(credit); + _endpoint.setCreditWindow(window); + + } + + + public String getAddress() + { + return ((Source)_endpoint.getSource()).getAddress(); + } + + public Map getFilter() + { + return ((Source)_endpoint.getSource()).getFilter(); + } + + public Message receive() + { + return receive(-1L); + } + + public Message receive(boolean wait) + { + return receive(wait ? -1L : 0L); + } + + // 0 means no wait, -1 wait forever + public Message receive(long wait) + { + Message m = null; + Transfer xfr; + long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L; + + while((xfr = receiveFromPrefetch(wait)) != null ) + { + + if(!Boolean.TRUE.equals(xfr.getAborted())) + { + Binary deliveryTag = xfr.getDeliveryTag(); + Boolean resume = xfr.getResume(); + + List<Section> sections = new ArrayList<Section>(); + List<ByteBuffer> payloads = new ArrayList<ByteBuffer>(); + int totalSize = 0; + + boolean hasMore; + do + { + hasMore = Boolean.TRUE.equals(xfr.getMore()); + + ByteBuffer buf = xfr.getPayload(); + + if(buf != null) + { + + totalSize += buf.remaining(); + + payloads.add(buf); + } + if(hasMore) + { + xfr = receiveFromPrefetch(-1l); + if(xfr== null) + { + // TODO - this is wrong!!!! + System.out.println("eeek"); + } + } + } + while(hasMore && !Boolean.TRUE.equals(xfr.getAborted())); + + if(!Boolean.TRUE.equals(xfr.getAborted())) + { + ByteBuffer allPayload = ByteBuffer.allocate(totalSize); + for(ByteBuffer payload : payloads) + { + allPayload.put(payload); + } + allPayload.flip(); + SectionDecoder decoder = _session.getSectionDecoder(); + + try + { + sections = decoder.parseAll(allPayload); + } + catch (AmqpErrorException e) + { + // todo - throw a sensible error + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + m = new Message(sections); + m.setDeliveryTag(deliveryTag); + m.setResume(resume); + m.setReceiver(this); + break; + } + } + + if(wait > 0L) + { + wait = endTime - System.currentTimeMillis(); + if(wait <=0L) + { + break; + } + } + } + + + return m; + + } + + private Transfer receiveFromPrefetch(long wait) + { + long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L); + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + Transfer xfr; + while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached() + && wait != 0) + { + try + { + if(wait>0L) + { + lock.wait(wait); + } + else if(wait<0L) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + return null; + } + if(wait > 0L) + { + wait = endTime - System.currentTimeMillis(); + if(wait <= 0L) + { + break; + } + } + + } + if(xfr != null) + { + _prefetchQueue.poll(); + + } + + return xfr; + } + + } + + + public void release(final Message m) + { + release(m.getDeliveryTag()); + } + + public void release(Binary deliveryTag) + { + update(new Released(), deliveryTag, null, null); + } + + + public void modified(Binary tag) + { + final Modified outcome = new Modified(); + outcome.setDeliveryFailed(true); + + update(outcome, tag, null, null); + } + + public void acknowledge(final Message m) + { + acknowledge(m.getDeliveryTag()); + } + + public void acknowledge(final Message m, SettledAction a) + { + acknowledge(m.getDeliveryTag(), a); + } + + + public void acknowledge(final Message m, Transaction txn) + { + acknowledge(m.getDeliveryTag(), txn); + } + + + public void acknowledge(final Binary deliveryTag) + { + acknowledge(deliveryTag, null, null); + } + + + public void acknowledge(final Binary deliveryTag, SettledAction a) + { + acknowledge(deliveryTag, null, a); + } + + public void acknowledge(final Binary deliveryTag, final Transaction txn) + { + acknowledge(deliveryTag, txn, null); + } + + public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action) + { + update(new Accepted(), deliveryTag, txn, action); + } + + public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action) + { + + DeliveryState state; + if(txn != null) + { + TransactionalState txnState = new TransactionalState(); + txnState.setOutcome(outcome); + txnState.setTxnId(txn.getTxnId()); + state = txnState; + } + else + { + state = (DeliveryState) outcome; + } + boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); + + if(!(settled || action == null)) + { + _unsettledMap.put(deliveryTag, action); + } + + _endpoint.updateDisposition(deliveryTag,state, settled); + } + + public Error getError() + { + return _error; + } + + public void acknowledgeAll(Message m) + { + acknowledgeAll(m.getDeliveryTag()); + } + + public void acknowledgeAll(Binary deliveryTag) + { + acknowledgeAll(deliveryTag, null, null); + } + + public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action) + { + updateAll(new Accepted(), deliveryTag, txn, action); + } + + public void updateAll(Outcome outcome, Binary deliveryTag) + { + updateAll(outcome, deliveryTag, null, null); + } + + public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action) + { + DeliveryState state; + + if(txn != null) + { + TransactionalState txnState = new TransactionalState(); + txnState.setOutcome(outcome); + txnState.setTxnId(txn.getTxnId()); + state = txnState; + } + else + { + state = (DeliveryState) outcome; + } + boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); + + if(!(settled || action == null)) + { + _unsettledMap.put(deliveryTag, action); + } + _endpoint.updateAllDisposition(deliveryTag, state, settled); + } + + + + public void close() + { + _endpoint.setTarget(null); + _endpoint.close(); + Message msg; + while((msg = receive(-1l)) != null) + { + release(msg); + } + + } + + + public void detach() + { + _endpoint.setTarget(null); + _endpoint.detach(); + Message msg; + while((msg = receive(-1l)) != null) + { + release(msg); + } + + } + + public void drain() + { + _endpoint.drain(); + } + + /** + * Waits for the receiver to drain or a message to be available to be received. + * @return true if the receiver has been drained. + */ + public boolean drainWait() + { + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + try + { + while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + } + } + return _prefetchQueue.peek()==null && _endpoint.isDrained(); + } + + /** + * Clears the receiver drain so that message delivery can resume. + */ + public void clearDrain() + { + _endpoint.clearDrain(); + } + + public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) + { + _endpoint.setLinkCredit(credit); + _endpoint.setTransactionId(txn == null ? null : txn.getTxnId()); + _endpoint.setCreditWindow(false); + + } + + public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) + { + if(Boolean.TRUE.equals(settled)) + { + SettledAction action = _unsettledMap.remove(deliveryTag); + if(action != null) + { + action.onSettled(deliveryTag); + } + } + } + + public Map<Binary, Outcome> getRemoteUnsettled() + { + return _endpoint.getInitialUnsettledMap(); + } + + + public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener) + { + synchronized(_endpoint.getLock()) + { + _messageArrivalListener = messageArrivalListener; + int prefetchSize = _prefetchQueue.size(); + for(int i = 0; i < prefetchSize; i++) + { + postPrefetchAction(); + } + } + } + + public Session getSession() + { + return _session; + } + + public org.apache.qpid.amqp_1_0.type.Source getSource() + { + return _endpoint.getSource(); + } + + public static interface SettledAction + { + public void onSettled(Binary deliveryTag); + } + + + public interface MessageArrivalListener + { + void messageArrived(Receiver receiver); + } + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 0feaa48805..851d02a798 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -1,452 +1,452 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Sender implements DeliveryStateHandler
-{
- private SendingLinkEndpoint _endpoint;
- private int _id;
- private Session _session;
- private int _windowSize;
- private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
- private boolean _closed;
- private Error _error;
- private Runnable _remoteErrorTask;
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, false);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- boolean synchronous)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
- }
-
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, AcknowledgeMode.ALO);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
- source.setAddress(sourceAddr);
- return source;
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
- target.setAddress(targetAddr);
- if(isDurable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- return target;
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
-
- _session = session;
- session.getConnection().checkNotClosed();
- _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
- source, target, unsettled);
-
-
- switch(mode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.attach();
- _windowSize = window;
-
- synchronized(_endpoint.getLock())
- {
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
- }
- if(_endpoint.getTarget()== null)
- {
- throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- };
- }
-
- _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
- {
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(_error != null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
- }
-
- public Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public Target getTarget()
- {
- return _endpoint.getTarget();
- }
-
- public void send(Message message) throws LinkDetachedException
- {
- send(message, null, null);
- }
-
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
- {
- send(message, null, action);
- }
-
- public void send(Message message, final Transaction txn) throws LinkDetachedException
- {
- send(message, txn, null);
- }
-
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
- {
-
- List<Section> sections = message.getPayload();
-
- Transfer xfr = new Transfer();
-
- if(sections != null && !sections.isEmpty())
- {
- SectionEncoder encoder = _session.getSectionEncoder();
- encoder.reset();
-
- int sectionNumber = 0;
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- }
-
-
- Binary encoding = encoder.getEncoding();
- ByteBuffer payload = encoding.asByteBuffer();
- xfr.setPayload(payload);
- }
- if(message.getDeliveryTag() == null)
- {
- message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
- }
- if(message.isResume())
- {
- xfr.setResume(Boolean.TRUE);
- }
- if(message.getDeliveryState() != null)
- {
- xfr.setState(message.getDeliveryState());
- }
-
- xfr.setDeliveryTag(message.getDeliveryTag());
- //xfr.setSettled(_windowSize ==0);
- if(txn != null)
- {
- xfr.setSettled(false);
- TransactionalState deliveryState = new TransactionalState();
- deliveryState.setTxnId(txn.getTxnId());
- xfr.setState(deliveryState);
- }
- else
- {
- xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
- }
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- if(action != null)
- {
- _outcomeActions.put(message.getDeliveryTag(), action);
- }
- _endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
-
- if(_windowSize != 0)
- {
- synchronized(lock)
- {
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
-
-
- }
-
- public void close() throws SenderClosingException
- {
-
- if(_windowSize != 0)
- {
- synchronized(_endpoint.getLock())
- {
-
-
- while(_endpoint.getUnsettledCount() > 0)
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- }
- _session.removeSender(this);
- _endpoint.setSource(null);
- _endpoint.detach();
- _closed = true;
-
- synchronized(_endpoint.getLock())
- {
- while(!_endpoint.isDetached())
- {
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderClosingException(e);
- }
- }
- }
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(state instanceof Outcome)
- {
- OutcomeAction action;
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, (Outcome) state);
- }
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
- }
- else if(state instanceof TransactionalState)
- {
- OutcomeAction action;
-
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
- }
-
- }
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Map<Binary, DeliveryState> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
- public Session getSession()
- {
- return _session;
- }
-
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- _remoteErrorTask.run();
- }
- }
-
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public class SenderCreationException extends Exception
- {
- public SenderCreationException(Throwable e)
- {
- super(e);
- }
-
- public SenderCreationException(String e)
- {
- super(e);
-
- }
- }
-
- public class SenderClosingException extends Exception
- {
- public SenderClosingException(Throwable e)
- {
- super(e);
- }
- }
-
- public static interface OutcomeAction
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome);
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; +import org.apache.qpid.amqp_1_0.type.transport.*; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.qpid.amqp_1_0.type.transport.Error; + +public class Sender implements DeliveryStateHandler +{ + private SendingLinkEndpoint _endpoint; + private int _id; + private Session _session; + private int _windowSize; + private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>()); + private boolean _closed; + private Error _error; + private Runnable _remoteErrorTask; + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, false); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + boolean synchronous) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO); + } + + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window) throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, AcknowledgeMode.ALO); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, null); + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, target, source, window, mode, null); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled); + } + + public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, + int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) + { + org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); + source.setAddress(sourceAddr); + return source; + } + + private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable) + { + org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target(); + target.setAddress(targetAddr); + if(isDurable) + { + target.setDurable(TerminusDurability.UNSETTLED_STATE); + target.setExpiryPolicy(TerminusExpiryPolicy.NEVER); + } + return target; + } + + public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, + int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) + throws SenderCreationException, ConnectionClosedException + { + + _session = session; + session.getConnection().checkNotClosed(); + _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, + source, target, unsettled); + + + switch(mode) + { + case ALO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + _endpoint.setDeliveryStateHandler(this); + _endpoint.attach(); + _windowSize = window; + + synchronized(_endpoint.getLock()) + { + while(!(_endpoint.isAttached() || _endpoint.isDetached())) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderCreationException(e); + } + } + if(_endpoint.getTarget()== null) + { + throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); + }; + } + + _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() + { + + @Override + public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) + { + _error = detach.getError(); + if(_error != null) + { + remoteError(); + } + super.remoteDetached(endpoint, detach); + } + }); + } + + public Source getSource() + { + return _endpoint.getSource(); + } + + public Target getTarget() + { + return _endpoint.getTarget(); + } + + public void send(Message message) throws LinkDetachedException + { + send(message, null, null); + } + + public void send(Message message, final OutcomeAction action) throws LinkDetachedException + { + send(message, null, action); + } + + public void send(Message message, final Transaction txn) throws LinkDetachedException + { + send(message, txn, null); + } + + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException + { + + List<Section> sections = message.getPayload(); + + Transfer xfr = new Transfer(); + + if(sections != null && !sections.isEmpty()) + { + SectionEncoder encoder = _session.getSectionEncoder(); + encoder.reset(); + + int sectionNumber = 0; + for(Section section : sections) + { + encoder.encodeObject(section); + } + + + Binary encoding = encoder.getEncoding(); + ByteBuffer payload = encoding.asByteBuffer(); + xfr.setPayload(payload); + } + if(message.getDeliveryTag() == null) + { + message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes())); + } + if(message.isResume()) + { + xfr.setResume(Boolean.TRUE); + } + if(message.getDeliveryState() != null) + { + xfr.setState(message.getDeliveryState()); + } + + xfr.setDeliveryTag(message.getDeliveryTag()); + //xfr.setSettled(_windowSize ==0); + if(txn != null) + { + xfr.setSettled(false); + TransactionalState deliveryState = new TransactionalState(); + deliveryState.setTxnId(txn.getTxnId()); + xfr.setState(deliveryState); + } + else + { + xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); + } + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + if(_endpoint.isDetached()) + { + throw new LinkDetachedException(_error); + } + if(action != null) + { + _outcomeActions.put(message.getDeliveryTag(), action); + } + _endpoint.transfer(xfr); + //TODO - rationalise sending of flows + // _endpoint.sendFlow(); + } + + if(_windowSize != 0) + { + synchronized(lock) + { + + + while(_endpoint.getUnsettledCount() >= _windowSize) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + + + } + + public void close() throws SenderClosingException + { + + if(_windowSize != 0) + { + synchronized(_endpoint.getLock()) + { + + + while(_endpoint.getUnsettledCount() > 0) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + + } + _session.removeSender(this); + _endpoint.setSource(null); + _endpoint.detach(); + _closed = true; + + synchronized(_endpoint.getLock()) + { + while(!_endpoint.isDetached()) + { + try + { + _endpoint.getLock().wait(); + } + catch (InterruptedException e) + { + throw new SenderClosingException(e); + } + } + } + } + + public boolean isClosed() + { + return _closed; + } + + public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) + { + if(state instanceof Outcome) + { + OutcomeAction action; + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, (Outcome) state); + } + if(!Boolean.TRUE.equals(settled)) + { + _endpoint.updateDisposition(deliveryTag, state, true); + } + } + else if(state instanceof TransactionalState) + { + OutcomeAction action; + + if((action = _outcomeActions.remove(deliveryTag)) != null) + { + action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome()); + } + + } + } + + public SendingLinkEndpoint getEndpoint() + { + return _endpoint; + } + + public Map<Binary, DeliveryState> getRemoteUnsettled() + { + return _endpoint.getInitialUnsettledMap(); + } + + public Session getSession() + { + return _session; + } + + + private void remoteError() + { + if(_remoteErrorTask != null) + { + _remoteErrorTask.run(); + } + } + + + public void setRemoteErrorListener(Runnable listener) + { + _remoteErrorTask = listener; + } + + public Error getError() + { + return _error; + } + + public class SenderCreationException extends Exception + { + public SenderCreationException(Throwable e) + { + super(e); + } + + public SenderCreationException(String e) + { + super(e); + + } + } + + public class SenderClosingException extends Exception + { + public SenderClosingException(Throwable e) + { + super(e); + } + } + + public static interface OutcomeAction + { + public void onOutcome(Binary deliveryTag, Outcome outcome); + } +} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 182d904a9c..79ed3b4457 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -1,384 +1,384 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
- private SessionEndpoint _endpoint;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
- private SectionEncoder _sectionEncoder;
- private SectionDecoder _sectionDecoder;
- private TransactionController _sessionLocalTC;
- private Connection _connection;
-
- public Session(final Connection connection, String name)
- {
- _connection = connection;
- _endpoint = connection.getEndpoint().createSession(name);
- _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- }
-
-
- public synchronized Sender createSender(final String targetName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, false);
- }
-
- public synchronized Sender createSender(final String targetName, boolean synchronous)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
-
- }
-
- public synchronized Sender createSender(final String targetName, int window)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- return createSender(targetName, window, mode, null);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, null);
- }
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, false, unsettled);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
- targetName, null, window, mode, isDurable, unsettled);
-
- }
-
-
- public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode, ackMode, linkName, false);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
- }
-
- public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
-
- final Target target = new Target();
- final Source source = new Source();
- source.setAddress(sourceAddr);
- source.setDistributionMode(mode);
- source.setFilter(filters);
-
- if(linkName == null)
- {
- linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
- }
-
- final Receiver receiver =
- new Receiver(this, linkName,
- target, source, ackMode, isDurable, unsettled);
- _receivers.add(receiver);
-
- return receiver;
-
- }
-
- public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.COPY);
- }
-
- public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, StdDistMode.MOVE);
- }
-
- public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
- {
- Source source = new Source();
- source.setDynamic(true);
-
- final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
- source, AcknowledgeMode.ALO);
- _receivers.add(receiver);
- return receiver;
- }
-
- public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
- {
- Target target = new Target();
- target.setDynamic(true);
-
- final Sender sender;
- sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
- new Source(), 0, AcknowledgeMode.ALO);
- _senders.add(sender);
- return sender;
- }
-
-
-
- public SessionEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public synchronized void close()
- {
- try
- {
- for(Sender sender : new ArrayList<Sender>(_senders))
- {
- sender.close();
- }
- for(Receiver receiver : new ArrayList<Receiver>(_receivers))
- {
- receiver.detach();
- }
- if(_sessionLocalTC != null)
- {
- _sessionLocalTC.close();
- }
- _endpoint.end();
- }
- catch (Sender.SenderClosingException e)
- {
-// TODO
- e.printStackTrace();
- }
-
- //TODO
-
- }
-
- void removeSender(Sender sender)
- {
- _senders.remove(sender);
- }
-
- void removeReceiver(Receiver receiver)
- {
- _receivers.remove(receiver);
- }
-
- public SectionEncoder getSectionEncoder()
- {
- return _sectionEncoder;
- }
-
- public SectionDecoder getSectionDecoder()
- {
- return _sectionDecoder;
- }
-
-
- public Transaction createSessionLocalTransaction()
- {
- TransactionController localController = getSessionLocalTransactionController();
- return localController.beginTransaction();
- }
-
- private TransactionController getSessionLocalTransactionController()
- {
- if(_sessionLocalTC == null)
- {
- _sessionLocalTC = createSessionLocalTransactionController();
- }
- return _sessionLocalTC;
- }
-
- private TransactionController createSessionLocalTransactionController()
- {
- String name = "txnControllerLink";
- SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
- TxnCapability.MULTI_TXNS_PER_SSN);
- tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- tcLinkEndpoint.attach();
- return new TransactionController(this, tcLinkEndpoint);
- }
-
-
- public Message receive()
- {
- while(getEndpoint().getState() == SessionState.ACTIVE)
- {
- synchronized (getEndpoint().getLock())
- {
- try
- {
- for(Receiver r : _receivers)
- {
- Message m = r.receive(false);
- if(m != null)
- return m;
- }
- wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- return null;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public void awaitActive()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isEnded() && !getEndpoint().isActive())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.amqp_1_0.transport.SessionState; +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; +import org.apache.qpid.amqp_1_0.type.messaging.Target; +import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; +import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class Session +{ + private SessionEndpoint _endpoint; + private List<Receiver> _receivers = new ArrayList<Receiver>(); + private List<Sender> _senders = new ArrayList<Sender>(); + private SectionEncoder _sectionEncoder; + private SectionDecoder _sectionDecoder; + private TransactionController _sessionLocalTC; + private Connection _connection; + + public Session(final Connection connection, String name) + { + _connection = connection; + _endpoint = connection.getEndpoint().createSession(name); + _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); + _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); + } + + + public synchronized Sender createSender(final String targetName) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, false); + } + + public synchronized Sender createSender(final String targetName, boolean synchronous) + throws Sender.SenderCreationException, ConnectionClosedException + { + + final String sourceName = UUID.randomUUID().toString(); + return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous); + + } + + public synchronized Sender createSender(final String targetName, int window) + throws Sender.SenderCreationException, ConnectionClosedException + { + final String sourceName = UUID.randomUUID().toString(); + return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window); + + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode) + throws Sender.SenderCreationException, ConnectionClosedException + { + + return createSender(targetName, window, mode, null); + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, window, mode, linkName, null); + } + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled) + throws Sender.SenderCreationException, ConnectionClosedException + { + return createSender(targetName, window, mode, linkName, false, unsettled); + } + + public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, + boolean isDurable, Map<Binary, Outcome> unsettled) + throws Sender.SenderCreationException, ConnectionClosedException + { + return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName, + targetName, null, window, mode, isDurable, unsettled); + + } + + + public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, null, AcknowledgeMode.ALO); + } + + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable); + } + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable, + Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled); + } + + + public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, + boolean isDurable, Map<Binary, Outcome> unsettled) + throws ConnectionErrorException + { + return createReceiver(queue, null, mode, linkName, isDurable, unsettled); + } + + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName); + } + + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, ackMode, null); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName) + throws ConnectionErrorException + { + return createReceiver(sourceAddr,mode, ackMode, linkName, false); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable) + throws ConnectionErrorException + { + return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null); + } + + private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable, + Map<Binary, Outcome> unsettled) + throws ConnectionErrorException + { + return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled); + } + + public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, + final AcknowledgeMode ackMode, String linkName, boolean isDurable, + Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) + throws ConnectionErrorException + { + + final Target target = new Target(); + final Source source = new Source(); + source.setAddress(sourceAddr); + source.setDistributionMode(mode); + source.setFilter(filters); + + if(linkName == null) + { + linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")"; + } + + final Receiver receiver = + new Receiver(this, linkName, + target, source, ackMode, isDurable, unsettled); + _receivers.add(receiver); + + return receiver; + + } + + public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, StdDistMode.COPY); + } + + public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException + { + return createReceiver(sourceAddr, StdDistMode.MOVE); + } + + public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException + { + Source source = new Source(); + source.setDynamic(true); + + final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(), + source, AcknowledgeMode.ALO); + _receivers.add(receiver); + return receiver; + } + + public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException + { + Target target = new Target(); + target.setDynamic(true); + + final Sender sender; + sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target, + new Source(), 0, AcknowledgeMode.ALO); + _senders.add(sender); + return sender; + } + + + + public SessionEndpoint getEndpoint() + { + return _endpoint; + } + + public synchronized void close() + { + try + { + for(Sender sender : new ArrayList<Sender>(_senders)) + { + sender.close(); + } + for(Receiver receiver : new ArrayList<Receiver>(_receivers)) + { + receiver.detach(); + } + if(_sessionLocalTC != null) + { + _sessionLocalTC.close(); + } + _endpoint.end(); + } + catch (Sender.SenderClosingException e) + { +// TODO + e.printStackTrace(); + } + + //TODO + + } + + void removeSender(Sender sender) + { + _senders.remove(sender); + } + + void removeReceiver(Receiver receiver) + { + _receivers.remove(receiver); + } + + public SectionEncoder getSectionEncoder() + { + return _sectionEncoder; + } + + public SectionDecoder getSectionDecoder() + { + return _sectionDecoder; + } + + + public Transaction createSessionLocalTransaction() + { + TransactionController localController = getSessionLocalTransactionController(); + return localController.beginTransaction(); + } + + private TransactionController getSessionLocalTransactionController() + { + if(_sessionLocalTC == null) + { + _sessionLocalTC = createSessionLocalTransactionController(); + } + return _sessionLocalTC; + } + + private TransactionController createSessionLocalTransactionController() + { + String name = "txnControllerLink"; + SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN, + TxnCapability.MULTI_TXNS_PER_SSN); + tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + tcLinkEndpoint.attach(); + return new TransactionController(this, tcLinkEndpoint); + } + + + public Message receive() + { + while(getEndpoint().getState() == SessionState.ACTIVE) + { + synchronized (getEndpoint().getLock()) + { + try + { + for(Receiver r : _receivers) + { + Message m = r.receive(false); + if(m != null) + return m; + } + wait(); + } + catch (InterruptedException e) + { + } + } + } + return null; + } + + public Connection getConnection() + { + return _connection; + } + + public void awaitActive() + { + synchronized(getEndpoint().getLock()) + { + while(!getEndpoint().isEnded() && !getEndpoint().isActive()) + { + try + { + getEndpoint().getLock().wait(); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + } + } +} |
