summaryrefslogtreecommitdiff
path: root/qpid/java/amqp-1-0-client
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
committerAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
commit0a0baee45ebcff44635907d457c4ff6810b09c87 (patch)
tree8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/amqp-1-0-client
parent54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff)
downloadqpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-client')
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java43
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java430
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java129
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java343
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java279
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java246
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java234
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java342
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java235
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java531
-rw-r--r--qpid/java/amqp-1-0-client/pom.xml53
-rw-r--r--qpid/java/amqp-1-0-client/resources/LICENSE204
-rw-r--r--qpid/java/amqp-1-0-client/resources/NOTICE5
-rw-r--r--qpid/java/amqp-1-0-client/resources/README.txt7
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java28
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java39
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java423
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java31
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java45
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java39
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java40
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java321
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java670
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java79
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java285
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java552
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java462
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java273
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java39
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java49
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java226
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java38
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java29
-rw-r--r--qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory19
34 files changed, 0 insertions, 6768 deletions
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
deleted file mode 100644
index 3bb26744c4..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.lang.reflect.InvocationTargetException;
-
-public class Command
-{
- public static void main(String[] args) throws
- ClassNotFoundException,
- NoSuchMethodException,
- InvocationTargetException,
- IllegalAccessException,
- InstantiationException
- {
- String name = args[0];
- String[] cmdArgs = new String[args.length-1];
- System.arraycopy(args,1,cmdArgs,0,args.length-1);
- name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase();
- Class<Util> clazz = (Class<Util>) Class.forName(name);
- Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs);
- util.run();
-
- }
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
deleted file mode 100644
index 09d19f4394..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
+++ /dev/null
@@ -1,430 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-public class Demo extends Util
-{
- private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
- private static final String OPCODE = "opcode";
- private static final String ACTION = "action";
- private static final String MESSAGE_ID = "message-id";
- private static final String VENDOR = "vendor";
- private static final String LOG = "log";
- private static final String RECEIVED = "received";
- private static final String TEST = "test";
- private static final String APACHE = "apache";
- private static final String SENT = "sent";
- private static final String LINK_REF = "link-ref";
- private static final String HOST = "host";
- private static final String PORT = "port";
- private static final String SASL_USER = "sasl-user";
- private static final String SASL_PASSWORD = "sasl-password";
- private static final String ROLE = "role";
- private static final String ADDRESS = "address";
- private static final String SENDER = "sender";
- private static final String SEND_MESSAGE = "send-message";
- private static final String ANNOUNCE = "announce";
- private static final String MESSAGE_VENDOR = "message-vendor";
- private static final String CREATE_LINK = "create-link";
-
- public static void main(String[] args)
- {
- new Demo(args).run();
- }
-
- public Demo(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- public void run()
- {
-
- try
- {
-
- final String vendor = getArgs()[0];
- final String queue = "control";
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
-
- Receiver responseReceiver;
-
- responseReceiver = session.createTemporaryQueueReceiver();
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), null);
-
-
- Properties properties = new Properties();
- properties.setMessageId(java.util.UUID.randomUUID());
- properties.setReplyTo(responseReceiver.getAddress());
-
- HashMap appPropMap = new HashMap();
- ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
-
- appPropMap.put(OPCODE, ANNOUNCE);
- appPropMap.put(VENDOR, vendor);
- appPropMap.put(ADDRESS,responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { properties, appProperties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- try
- {
- s.send(message1);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
- Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
-
-
- boolean done = false;
-
- while(!done)
- {
- boolean wait = true;
- Message m = responseReceiver.receive(false);
- if(m != null)
- {
- List<Section> payload = m.getPayload();
- wait = false;
- ApplicationProperties props = m.getApplicationProperties();
- Map map = props.getValue();
- String op = (String) map.get(OPCODE);
- if("reset".equals(op))
- {
- for(Sender sender : sendingLinks.values())
- {
- try
- {
- sender.close();
- Session session1 = sender.getSession();
- session1.close();
- session1.getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- for(Receiver receiver : receivingLinks.values())
- {
- try
- {
- receiver.close();
- receiver.getSession().close();
- receiver.getSession().getConnection().close();
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- sendingLinks.clear();
- receivingLinks.clear();
- }
- else if(CREATE_LINK.equals(op))
- {
- Object linkRef = map.get(LINK_REF);
- String host = (String) map.get(HOST);
- Object o = map.get(PORT);
- int port = Integer.parseInt(String.valueOf(o));
- String user = (String) map.get(SASL_USER);
- String password = (String) map.get(SASL_PASSWORD);
- String role = (String) map.get(ROLE);
- String address = (String) map.get(ADDRESS);
- System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
- try{
-
-
- Connection conn2 = new Connection(host, port, user, password, host);
- Session session2 = conn2.createSession();
- if(sendingLinks.containsKey(linkRef))
- {
- try
- {
- sendingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(receivingLinks.containsKey(linkRef))
- {
- try
- {
- receivingLinks.remove(linkRef).close();
- }
- catch (Exception e)
- {
-
- }
- }
- if(SENDER.equals(role))
- {
-
- System.err.println("%%% Creating sender (" + linkRef + ")");
- Sender sender = session2.createSender(address);
- sendingLinks.put(linkRef, sender);
- }
- else
- {
-
- System.err.println("%%% Creating receiver (" + linkRef + ")");
- Receiver receiver2 = session2.createReceiver(address);
- receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
- receivingLinks.put(linkRef, receiver2);
- }
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }
- else if(SEND_MESSAGE.equals(op))
- {
- Sender sender = sendingLinks.get(map.get(LINK_REF));
- Properties m2props = new Properties();
- Object messageId = map.get(MESSAGE_ID);
- m2props.setMessageId(messageId);
- Map m2propmap = new HashMap();
- m2propmap.put(OPCODE, TEST);
- m2propmap.put(VENDOR, vendor);
- ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
- Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- try
- {
- sender.send(m2);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, SENT);
- m3propmap.put(MESSAGE_ID, messageId);
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, vendor);
-
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+messageId)));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- responseReceiver.acknowledge(m);
- }
- else
- {
- for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
- {
- m = entry.getValue().receive(false);
- if(m != null)
- {
- wait = false;
-
- System.err.println("%%% Received message from " + entry.getKey());
-
- Properties mp = m.getProperties();
- ApplicationProperties ap = m.getApplicationProperties();
-
- Map m3propmap = new HashMap();
- m3propmap.put(OPCODE, LOG);
- m3propmap.put(ACTION, RECEIVED);
- m3propmap.put(MESSAGE_ID, mp.getMessageId());
- m3propmap.put(VENDOR, vendor);
- m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
-
- Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
- new AmqpValue("AMQP-"+mp.getMessageId())));
- try
- {
- s.send(m3);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
- entry.getValue().acknowledge(m);
- }
-
- }
- }
-
- if(wait)
- {
- try
- {
- Thread.sleep(500l);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- }
-
-
-
-
-
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- }
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
deleted file mode 100644
index 06440b8f19..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.commons.cli.Options;
-
-public class Dump extends Util
-{
- private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:";
-
-
- protected Dump(String[] args)
- {
- super(args);
- }
-
- public static void main(String[] args)
- {
- new Dump(args).run();
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected void printUsage(Options options)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, 10);
-
- Message message = new Message("dump me");
- message.setDeliveryTag(new Binary("dump".getBytes()));
-
- s.send(message);
-
- s.close();
- session.close();
- conn.close();
-
- } catch (Exception e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
deleted file mode 100644
index e65d1324ef..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class Filereceiver extends Util
-{
- private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:";
-
- protected Filereceiver(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
-
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
- final String directoryName = getArgs()[1];
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
- final File directory = new File(directoryName);
- if(directory.isDirectory() && directory.canWrite())
- {
- File tmpDirectory = new File(directoryName, ".tmp");
- if(!tmpDirectory.exists())
- {
- tmpDirectory.mkdir();
- }
-
- String[] unsettledFiles = tmpDirectory.list();
-
- Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
- final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
-
- Accepted accepted = new Accepted();
-
- for(String fileName : unsettledFiles)
- {
- File theFile = new File(tmpDirectory, fileName);
- if(theFile.isFile())
- {
- if(fileName.startsWith("~") && fileName.endsWith("~"))
- {
- theFile.delete();
- }
- else
- {
- int splitPoint = fileName.indexOf(".");
- String deliveryTagStr = fileName.substring(0,splitPoint);
- String actualFileName = fileName.substring(splitPoint+1);
-
- byte[] bytes = new byte[deliveryTagStr.length()/2];
-
-
- for(int i = 0; i < bytes.length; i++)
- {
- char c = deliveryTagStr.charAt(2*i);
- char d = deliveryTagStr.charAt(1+(2*i));
-
- bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4)
- | (d <= '9' ? d - '0' : d - 'W'));
-
- }
- Binary deliveryTag = new Binary(bytes);
- unsettled.put(deliveryTag, accepted);
- unsettledFileNames.put(deliveryTag, fileName);
- }
- }
-
- }
-
- Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
- unsettled);
-
- Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled();
-
- for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet())
- {
- if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
- {
-
- File tmpFile = new File(tmpDirectory, entry.getValue());
- final File dest = new File(directory,
- entry.getValue().substring(entry.getValue().indexOf(".") + 1));
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
-
- tmpFile.renameTo(dest);
- }
- }
-
-
- int credit = 10;
-
- r.setCredit(UnsignedInteger.valueOf(credit), true);
-
-
- int received = 0;
- Message m = null;
- do
- {
- m = isBlock() && received == 0 ? r.receive() : r.receive(10000);
- if(m != null)
- {
- if(m.isResume() && unsettled.containsKey(m.getDeliveryTag()))
- {
- final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag());
- final File unsettledFile = new File(tmpDirectory,
- tmpFileName);
- r.acknowledge(m, new Receiver.SettledAction()
- {
- public void onSettled(final Binary deliveryTag)
- {
- int splitPoint = tmpFileName.indexOf(".");
-
- String fileName = tmpFileName.substring(splitPoint+1);
-
- final File dest = new File(directory, fileName);
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
- unsettledFile.renameTo(dest);
- unsettledFileNames.remove(deliveryTag);
- }
- });
- }
- else
- {
- received++;
- List<Section> sections = m.getPayload();
- Binary deliveryTag = m.getDeliveryTag();
- StringBuilder tagNameBuilder = new StringBuilder();
-
- ByteBuffer dtbuf = deliveryTag.asByteBuffer();
- while(dtbuf.hasRemaining())
- {
- tagNameBuilder.append(String.format("%02x", dtbuf.get()));
- }
-
-
- ApplicationProperties properties = null;
- List<Binary> data = new ArrayList<Binary>();
- int totalSize = 0;
- for(Section section : sections)
- {
- if(section instanceof ApplicationProperties)
- {
- properties = (ApplicationProperties) section;
- }
- else if(section instanceof AmqpValue)
- {
- AmqpValue value = (AmqpValue) section;
- if(value.getValue() instanceof Binary)
- {
- Binary binary = (Binary) value.getValue();
- data.add(binary);
- totalSize += binary.getLength();
-
- }
- else
- {
- // TODO exception
- }
- }
- else if(section instanceof Data)
- {
- Data value = (Data) section;
- Binary binary = value.getValue();
- data.add(binary);
- totalSize += binary.getLength();
-
- }
- }
- if(properties != null)
- {
- final String fileName = (String) properties.getValue().get("filename");
- byte[] fileData = new byte[totalSize];
- ByteBuffer buf = ByteBuffer.wrap(fileData);
- int offset = 0;
- for(Binary bin : data)
- {
- buf.put(bin.asByteBuffer());
- }
- File outputFile = new File(tmpDirectory, "~"+fileName+"~");
- if(outputFile.exists())
- {
- outputFile.delete();
- }
- FileOutputStream fos = new FileOutputStream(outputFile);
- fos.write(fileData);
- fos.flush();
- fos.close();
-
- final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." +
- fileName);
- outputFile.renameTo(unsettledFile);
- r.acknowledge(m, new Receiver.SettledAction()
- {
- public void onSettled(final Binary deliveryTag)
- {
- final File dest = new File(directory, fileName);
- if(dest.exists())
- {
- System.err.println("Duplicate detected - filename " + dest.getName());
- }
- unsettledFile.renameTo(dest);
-
- }
- });
-
- }
- }
- }
- }
- while(m != null);
-
-
- r.close();
- }
- else
- {
- System.err.println("No such directory: " + directoryName);
- }
- session.close();
- conn.close();
- }
- catch (ConnectionException e)
- {
- e.printStackTrace();
- }
- catch (FileNotFoundException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (IOException e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- public static void main(String[] args)
- {
- new Filereceiver(args).run();
- }
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
deleted file mode 100644
index c7bcd99312..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-
-public class Filesender extends Util
-{
- private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:";
-
- protected Filesender(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return false;
- }
-
- @Override
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
-
- }
-
- @Override
- protected void run()
- {
- final String queue = getArgs()[0];
- final String directoryName = getArgs()[1];
-
- try
- {
- MessageDigest md5 = MessageDigest.getInstance("MD5");
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
- File directory = new File(directoryName);
- if(directory.isDirectory() && directory.canWrite())
- {
-
- File tmpDirectory = new File(directoryName, ".tmp");
- if(!tmpDirectory.exists())
- {
- tmpDirectory.mkdir();
- }
-
- String[] unsettledFiles = tmpDirectory.list();
-
-
-
- Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>();
- Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>();
- for(String fileName : unsettledFiles)
- {
- File aFile = new File(tmpDirectory, fileName);
- if(aFile.canRead() && aFile.canWrite())
- {
- Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
- unsettled.put(deliveryTag, null);
- unsettledFileNames.put(deliveryTag, fileName);
- }
- }
-
-
- Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(),
- unsettled);
-
- Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled();
-
- for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet())
- {
- if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey()))
- {
- (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue()));
- }
- }
-
- if(remoteUnsettled != null)
- {
- for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet())
- {
- if(entry.getValue() instanceof Accepted)
- {
- final String fileName = unsettledFileNames.get(entry.getKey());
- if(fileName != null)
- {
-
- Message resumed = new Message();
- resumed.setDeliveryTag(entry.getKey());
- resumed.setDeliveryState(entry.getValue());
- resumed.setResume(Boolean.TRUE);
- resumed.setSettled(Boolean.TRUE);
-
-
-
- final File unsettledFile = new File(tmpDirectory, fileName);
- unsettledFile.delete();
-
- s.send(resumed);
-
- }
-
- }
- else if(entry.getValue() instanceof Received || entry.getValue() == null)
- {
- final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey()));
- Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile);
- resumed.setResume(Boolean.TRUE);
- Sender.OutcomeAction action = new Sender.OutcomeAction()
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome)
- {
- if(outcome instanceof Accepted)
- {
- unsettledFile.delete();
- }
- }
- };
- s.send(resumed, action);
-
- }
- }
- }
-
-
-
- String[] files = directory.list();
-
- for(String fileName : files)
- {
- final File file = new File(directory, fileName);
-
- if(file.canRead() && file.canWrite() && !file.isDirectory())
- {
- Message message = createMessageFromFile(md5, fileName, file);
-
- final File unsettledFile = new File(tmpDirectory, fileName);
-
- Sender.OutcomeAction action = new Sender.OutcomeAction()
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome)
- {
- if(outcome instanceof Accepted)
- {
- unsettledFile.delete();
- }
- }
- };
-
- file.renameTo(unsettledFile);
-
- s.send(message, action);
- }
- }
-
- s.close();
- }
- else
- {
- System.err.println("No such directory: " + directory);
- }
- session.close();
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
- {
- FileInputStream fis = new FileInputStream(file);
- byte[] data = new byte[(int) file.length()];
-
- int read = fis.read(data);
-
- fis.close();
-
- Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName));
- Section amqpValue = new Data(new Binary(data));
- Message message = new Message(Arrays.asList(applicationProperties, amqpValue));
- Binary deliveryTag = new Binary(md5.digest(fileName.getBytes()));
- message.setDeliveryTag(deliveryTag);
- md5.reset();
- return message;
- }
-
- public static void main(String[] args)
- {
- new Filesender(args).run();
- }
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
deleted file mode 100644
index a084c0bacc..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.commons.cli.*;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-
-import java.util.Collections;
-
-public class Receive extends Util
-{
- private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:";
- private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L);
- private UnsignedLong _lastCorrelationId;
-
- public static void main(String[] args)
- {
- new Receive(args).run();
- }
-
-
- public Receive(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasFilterOption()
- {
- return true;
- }
-
- protected void run()
- {
-
- try
- {
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
-
-
- Session session = conn.createSession();
-
- Filter filter = null;
- if(getFilter() != null)
- {
- String[] filterParts = getFilter().split("=",2);
- if("exact-subject".equals(filterParts[0]))
- {
- filter = new ExactSubjectFilter(filterParts[1]);
- }
- else if("matching-subject".equals(filterParts[0]))
- {
- filter = new MatchingSubjectFilter(filterParts[1]);
- }
- else
- {
- System.err.println("Unknown filter type: " + filterParts[0]);
- }
- }
-
- Receiver r =
- filter == null
- ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink())
- : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null);
- Transaction txn = null;
-
- int credit = 0;
- int receivedCount = 0;
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
-
- Transaction txn2 = null;
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- txn2 = session.createSessionLocalTransaction();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
-
- if(credit == 0)
- {
- if(getCount() - i <= getWindowSize())
- {
- credit = getCount() - i;
-
- }
- else
- {
- credit = getWindowSize();
-
- }
-
- {
- r.setCredit(UnsignedInteger.valueOf(credit), false);
- }
- if(!isBlock())
- r.drain();
- }
-
- Message m = isBlock() ? r.receive() : r.receive(1000L);
- credit--;
- if(m==null)
- {
- break;
- }
-
-
-
- r.acknowledge(m.getDeliveryTag(),txn);
-
- receivedCount++;
-
- System.out.println("Received Message : " + m.getPayload());
- }
-
- if(useTran())
- {
- txn.commit();
- }
- }
- else
- {
- // TODO
- }
- }
- else
- {
- // TODO
- }
- r.close();
- session.close();
- conn.close();
- System.out.println("Total Messages Received: " + receivedCount);
- }
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (LinkDetachedException e)
- {
- e.printStackTrace();
- }
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
deleted file mode 100644
index bce7bfcd9a..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-import java.util.Arrays;
-
-public class Request extends Util
-{
- private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
-
- public static void main(String[] args)
- {
- new Request(args).run();
- }
-
- public Request(String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public void run()
- {
-
- try
- {
-
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- Connection conn = newConnection();
- Session session = conn.createSession();
-
- Connection conn2;
- Session session2;
- Receiver responseReceiver;
-
- if(isUseMultipleConnections())
- {
- conn2 = newConnection();
- session2 = conn2.createSession();
- responseReceiver = session2.createTemporaryQueueReceiver();
- }
- else
- {
- conn2 = null;
- session2 = null;
- responseReceiver = session.createTemporaryQueueReceiver();
- }
-
-
-
-
- responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), null);
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- int received = 0;
-
- if(getArgs().length >= 2)
- {
- message = getArgs()[1];
- if(message.length() < getMessageSize())
- {
- StringBuilder builder = new StringBuilder(getMessageSize());
- builder.append(message);
- for(int x = message.length(); x < getMessageSize(); x++)
- {
- builder.append('.');
- }
- message = builder.toString();
- }
-
- for(int i = 0; i < getCount(); i++)
- {
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- properties.setReplyTo(responseReceiver.getAddress());
-
- AmqpValue amqpValue = new AmqpValue(message);
- Section[] sections = { new Header() , properties, amqpValue};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
-
- Message responseMessage = responseReceiver.receive(false);
- if(responseMessage != null)
- {
- responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
- received++;
- }
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
-
- while(received < getCount())
- {
- Message responseMessage = responseReceiver.receive();
- responseReceiver.acknowledge(responseMessage.getDeliveryTag());
- received++;
- }
-
-
-
-
- s.close();
- session.close();
- conn.close();
-
- if(session2 != null)
- {
- session2.close();
- conn2.close();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
deleted file mode 100644
index e29323eb80..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-public class Respond extends Util
-{
- private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:";
- private Connection _conn;
- private Session _session;
- private Receiver _receiver;
- private Transaction _txn;
- private Map<String,Sender> _senders;
- private UnsignedLong _responseMsgId = UnsignedLong.ZERO;
- private Connection _conn2;
- private Session _session2;
-
- public Respond(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return true;
- }
-
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- public static void main(String[] args)
- {
- new Respond(args).run();
- }
-
- public void run()
- {
- try
- {
-
- _senders = new HashMap<String, Sender>();
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- _conn = newConnection();
-
-
-
- if(isUseMultipleConnections())
- {
- _conn2 = newConnection();
- _session2 = _conn2.createSession();
- }
-
-
- _session = _conn.createSession();
-
-
- _receiver = _session.createReceiver(queue, getMode());
- _txn = null;
-
- int credit = 0;
- int receivedCount = 0;
- _responseMsgId = UnsignedLong.ZERO;
-
- Random random = null;
- int batch = 0;
- List<Message> txnMessages = null;
- if(useTran())
- {
- if(getRollbackRatio() != 0)
- {
- random = new Random();
- }
- batch = getBatchSize();
- _txn = _session.createSessionLocalTransaction();
- txnMessages = new ArrayList<Message>(batch);
- }
-
-
- for(int i = 0; receivedCount < getCount(); i++)
- {
-
- if(credit == 0)
- {
- if(getCount() - i <= getWindowSize())
- {
- credit = getCount() - i;
-
- }
- else
- {
- credit = getWindowSize();
-
- }
-
- _receiver.setCredit(UnsignedInteger.valueOf(credit), false);
-
- if(!isBlock())
- _receiver.drain();
- }
-
- Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L);
- credit--;
- if(m==null)
- {
- if(useTran() && batch != getBatchSize())
- {
- _txn.commit();
- }
- break;
- }
-
- System.out.println("Received Message: " + m.getPayload());
-
- respond(m);
-
-
-
- if(useTran())
- {
-
- txnMessages.add(m);
-
- if(--batch == 0)
- {
-
- if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio())
- {
- _txn.commit();
- txnMessages.clear();
- receivedCount += getBatchSize();
- }
- else
- {
- System.out.println("Random Rollback");
- _txn.rollback();
- double result;
- do
- {
- _txn = _session.createSessionLocalTransaction();
-
- for(Message msg : txnMessages)
- {
- respond(msg);
- }
-
- result = random.nextDouble();
- if(result<getRollbackRatio())
- {
- _txn.rollback();
- }
- else
- {
- _txn.commit();
- txnMessages.clear();
- receivedCount += getBatchSize();
- }
- }
- while(result < getRollbackRatio());
- }
- _txn = _session.createSessionLocalTransaction();
-
- batch = getBatchSize();
- }
- }
- else
- {
- receivedCount++;
- }
-
- }
-
-
- for(Sender s : _senders.values())
- {
- s.close();
- }
-
- _receiver.close();
- _session.close();
- _conn.close();
- System.out.println("Received: " + receivedCount);
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
- }
-
- private void respond(Message m)
- throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException, TimeoutException
- {
- List<Section> sections = m.getPayload();
- String replyTo = null;
- Object correlationId = null;
- for(Section section : sections)
- {
- if(section instanceof Properties)
- {
- replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue();
- correlationId = ((Properties) section).getMessageId();
- break;
- }
- }
-
- if(replyTo != null)
- {
- Sender s = _senders.get(replyTo);
- if(s == null)
- {
- s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize());
- _senders.put(replyTo, s);
- }
-
- List<Section> replySections = new ArrayList<Section>(sections);
-
- ListIterator<Section> sectionIterator = replySections.listIterator();
-
- while(sectionIterator.hasNext())
- {
- Section section = sectionIterator.next();
- if(section instanceof Properties)
- {
- Properties newProps = new Properties();
- newProps.setTo(replyTo);
- newProps.setCorrelationId(correlationId);
- newProps.setMessageId(_responseMsgId);
- _responseMsgId = _responseMsgId.add(UnsignedLong.ONE);
- sectionIterator.set(newProps);
- }
- }
-
- Message replyMessage = new Message(replySections);
- System.out.println("Sent Message: " + replySections);
- s.send(replyMessage, _txn);
-
- }
- _receiver.acknowledge(m.getDeliveryTag(), _txn);
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
deleted file mode 100644
index 36aadc7851..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.util.Arrays;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedLong;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.commons.cli.*;
-
-public class Send extends Util
-{
- private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
- private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
-
-
- public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException
- {
- new Send(args).run();
- }
-
-
- public Send(final String[] args)
- {
- super(args);
- }
-
- @Override
- protected boolean hasLinkDurableOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasLinkNameOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasResponseQueueOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasBlockOption()
- {
- return false;
- }
-
- @Override
- protected boolean hasStdInOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasTxnOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasModeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasCountOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasWindowSizeOption()
- {
- return true;
- }
-
- @Override
- protected boolean hasSubjectOption()
- {
- return true;
- }
-
- public void run()
- {
-
- final String queue = getArgs()[0];
-
- String message = "";
-
- try
- {
- Connection conn = newConnection();
-
- Session session = conn.createSession();
-
-
- Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
-
- Transaction txn = null;
-
- if(useTran())
- {
- txn = session.createSessionLocalTransaction();
- }
-
- if(!useStdIn())
- {
- if(getArgs().length <= 2)
- {
- if(getArgs().length == 2)
- {
- message = getArgs()[1];
- }
- for(int i = 0; i < getCount(); i++)
- {
-
- Properties properties = new Properties();
- properties.setMessageId(UnsignedLong.valueOf(i));
- if(getSubject() != null)
- {
- properties.setSubject(getSubject());
- }
- Section bodySection;
- byte[] bytes = (message + " " + i).getBytes();
- if(bytes.length < getMessageSize())
- {
- byte[] origBytes = bytes;
- bytes = new byte[getMessageSize()];
- System.arraycopy(origBytes,0,bytes,0,origBytes.length);
- for(int x = origBytes.length; x < bytes.length; x++)
- {
- bytes[x] = (byte) '.';
- }
- bodySection = new Data(new Binary(bytes));
- }
- else
- {
- bodySection = new AmqpValue(message + " " + i);
- }
-
- Section[] sections = {properties, bodySection};
- final Message message1 = new Message(Arrays.asList(sections));
-
- s.send(message1, txn);
- }
- }
- else
- {
- for(int i = 1; i < getArgs().length; i++)
- {
- s.send(new Message(getArgs()[i]), txn);
- }
-
- }
- }
- else
- {
- LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
-
-
- try
- {
- while((message = buf.readLine()) != null)
- {
- s.send(new Message(message), txn);
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- }
-
- if(txn != null)
- {
- txn.commit();
- }
-
- s.close();
-
- session.close();
- conn.close();
- }
- catch (Exception e)
- {
- e.printStackTrace(); //TODO.
- }
-
- }
-
- protected void printUsage(Options options)
- {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(USAGE_STRING, options );
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
deleted file mode 100644
index 4421c44a61..0000000000
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.commons.cli.*;
-
-import java.util.logging.*;
-
-public abstract class Util
-{
-
- private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
- private String _host;
- private String _username;
- private String _password;
- private int _port;
- private int _count;
- private boolean _useStdIn;
- private boolean _useTran;
- private String[] _args;
- private AcknowledgeMode _mode;
- private boolean _block;
- private int _frameSize;
- private int _messageSize;
- private String _responseQueue;
- private int _batchSize;
- private double _rollbackRatio;
- private String _linkName;
- private String _containerName;
- private boolean _durableLink;
- private boolean _useMultipleConnections;
- private int _windowSize = 100;
- private String _subject;
- private String _filter;
- private String _remoteHost;
- private boolean _useSSL;
-
- protected Util(String[] args)
- {
- CommandLineParser cmdLineParse = new PosixParser();
-
- Options options = new Options();
- options.addOption("h","help",false,"show this help message and exit");
- options.addOption(OptionBuilder.withLongOpt("host")
- .withDescription( "host to connect to (default 0.0.0.0)" )
- .hasArg(true)
- .withArgName("HOST")
- .create('H'));
- options.addOption(OptionBuilder.withLongOpt("username")
- .withDescription( "username to use for authentication" )
- .hasArg(true)
- .withArgName("USERNAME")
- .create('u'));
- options.addOption(OptionBuilder.withLongOpt("password")
- .withDescription( "password to use for authentication" )
- .hasArg(true)
- .withArgName("PASSWORD")
- .create('w'));
- options.addOption(OptionBuilder.withLongOpt("port")
- .withDescription( "port to connect to (default 5672)" )
- .hasArg(true)
- .withArgName("PORT")
- .create('p'));
- options.addOption(OptionBuilder.withLongOpt("frame-size")
- .withDescription( "specify the maximum frame size" )
- .hasArg(true)
- .withArgName("FRAME_SIZE")
- .create('f'));
- options.addOption(OptionBuilder.withLongOpt("container-name")
- .withDescription( "Container name" )
- .hasArg(true)
- .withArgName("CONTAINER_NAME")
- .create('C'));
-
- options.addOption(OptionBuilder.withLongOpt("ssl")
- .withDescription("Use SSL")
- .create('S'));
-
- options.addOption(OptionBuilder.withLongOpt("remote-hostname")
- .withDescription( "hostname to supply in the open frame" )
- .hasArg(true)
- .withArgName("HOST")
- .create('O'));
-
- if(hasBlockOption())
- options.addOption(OptionBuilder.withLongOpt("block")
- .withDescription("block until messages arrive")
- .create('b'));
-
- if(hasCountOption())
- options.addOption(OptionBuilder.withLongOpt("count")
- .withDescription( "number of messages to send (default 1)" )
- .hasArg(true)
- .withArgName("COUNT")
- .create('c'));
- if(hasModeOption())
- options.addOption(OptionBuilder.withLongOpt("acknowledge-mode")
- .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" )
- .hasArg(true)
- .withArgName("MODE")
- .create('k'));
-
- if(hasSubjectOption())
- options.addOption(OptionBuilder.withLongOpt("subject")
- .withDescription( "subject message property" )
- .hasArg(true)
- .withArgName("SUBJECT")
- .create('s'));
-
-
- if(hasSingleLinkPerConnectionMode())
- options.addOption(OptionBuilder.withLongOpt("single-link-per-connection")
- .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once")
- .hasArg(false)
- .create('Z'));
-
- if(hasFilterOption())
- options.addOption(OptionBuilder.withLongOpt("filter")
- .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#")
- .hasArg(true)
- .withArgName("<TYPE>=<VALUE>")
- .create('F'));
-
-
- if(hasTxnOption())
- {
- options.addOption("x","txn",false,"use transactions");
- options.addOption(OptionBuilder.withLongOpt("batch-size")
- .withDescription( "transaction batch size (default: 1)" )
- .hasArg(true)
- .withArgName("BATCH-SIZE")
- .create('B'));
- options.addOption(OptionBuilder.withLongOpt("rollback-ratio")
- .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" )
- .hasArg(true)
- .withArgName("RATIO")
- .create('R'));
- }
-
- if(hasLinkDurableOption())
- {
- options.addOption("d","durable-link",false,"use a durable link");
- }
-
- if(hasStdInOption())
- options.addOption("i","stdin",false,"read messages from stdin (one message per line)");
-
- options.addOption(OptionBuilder.withLongOpt("trace")
- .withDescription("trace logging specified categories: RAW, FRM")
- .hasArg(true)
- .withArgName("TRACE")
- .create('t'));
- if(hasSizeOption())
- options.addOption(OptionBuilder.withLongOpt("message-size")
- .withDescription( "size to pad outgoing messages to" )
- .hasArg(true)
- .withArgName("SIZE")
- .create('z'));
-
- if(hasResponseQueueOption())
- options.addOption(OptionBuilder.withLongOpt("response-queue")
- .withDescription( "response queue to reply to" )
- .hasArg(true)
- .withArgName("RESPONSE_QUEUE")
- .create('r'));
-
- if(hasLinkNameOption())
- {
- options.addOption(OptionBuilder.withLongOpt("link")
- .withDescription( "link name" )
- .hasArg(true)
- .withArgName("LINK")
- .create('l'));
- }
-
- if(hasWindowSizeOption())
- {
- options.addOption(OptionBuilder.withLongOpt("window-size")
- .withDescription("credit window size")
- .hasArg(true)
- .withArgName("WINDOW-SIZE")
- .create('W'));
- }
-
- CommandLine cmdLine = null;
- try
- {
- cmdLine = cmdLineParse.parse(options, args);
-
- }
- catch (ParseException e)
- {
- printUsage(options);
- System.exit(-1);
- }
-
- if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty())
- {
- printUsage(options);
- System.exit(0);
- }
- _host = cmdLine.getOptionValue('H',"0.0.0.0");
- _remoteHost = cmdLine.getOptionValue('O',null);
- String portStr = cmdLine.getOptionValue('p',"5672");
- String countStr = cmdLine.getOptionValue('c',"1");
-
- _useSSL = cmdLine.hasOption('S');
-
- if(hasWindowSizeOption())
- {
- String windowSizeStr = cmdLine.getOptionValue('W',"100");
- _windowSize = Integer.parseInt(windowSizeStr);
- }
-
- if(hasSubjectOption())
- {
- _subject = cmdLine.getOptionValue('s');
- }
-
- if(cmdLine.hasOption('u'))
- {
- _username = cmdLine.getOptionValue('u');
- }
-
- if(cmdLine.hasOption('w'))
- {
- _password = cmdLine.getOptionValue('w');
- }
-
- if(cmdLine.hasOption('F'))
- {
- _filter = cmdLine.getOptionValue('F');
- }
-
- _port = Integer.parseInt(portStr);
-
- _containerName = cmdLine.getOptionValue('C');
-
- if(hasBlockOption())
- _block = cmdLine.hasOption('b');
-
- if(hasLinkNameOption())
- _linkName = cmdLine.getOptionValue('l');
-
-
- if(hasLinkDurableOption())
- _durableLink = cmdLine.hasOption('d');
-
- if(hasCountOption())
- _count = Integer.parseInt(countStr);
-
- if(hasStdInOption())
- _useStdIn = cmdLine.hasOption('i');
-
- if(hasSingleLinkPerConnectionMode())
- _useMultipleConnections = cmdLine.hasOption('Z');
-
- if(hasTxnOption())
- {
- _useTran = cmdLine.hasOption('x');
- _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1"));
- _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0"));
- }
-
- if(hasModeOption())
- {
- _mode = AcknowledgeMode.ALO;
-
- if(cmdLine.hasOption('k'))
- {
- _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k'));
- }
- }
-
- if(hasResponseQueueOption())
- {
- _responseQueue = cmdLine.getOptionValue('r');
- }
-
- _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536"));
-
- if(hasSizeOption())
- {
- _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1"));
- }
-
- String categoriesList = cmdLine.getOptionValue('t');
- String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]");
- for(String cat : categories)
- {
- if(cat.equalsIgnoreCase("FRM"))
- {
- FRAME_LOGGER.setLevel(Level.FINE);
- Formatter formatter = new Formatter()
- {
- @Override
- public String format(final LogRecord record)
- {
- return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n";
- }
- };
- for(Handler handler : FRAME_LOGGER.getHandlers())
- {
- FRAME_LOGGER.removeHandler(handler);
- }
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
- FRAME_LOGGER.addHandler(handler);
- }
- else if (cat.equalsIgnoreCase("RAW"))
- {
- RAW_LOGGER.setLevel(Level.FINE);
- Formatter formatter = new Formatter()
- {
- @Override
- public String format(final LogRecord record)
- {
- return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n";
- }
- };
- for(Handler handler : RAW_LOGGER.getHandlers())
- {
- RAW_LOGGER.removeHandler(handler);
- }
- Handler handler = new ConsoleHandler();
- handler.setLevel(Level.FINE);
- handler.setFormatter(formatter);
- RAW_LOGGER.addHandler(handler);
- }
- }
-
-
- _args = cmdLine.getArgs();
-
- }
-
- protected boolean hasFilterOption()
- {
- return false;
- }
-
- protected boolean hasSubjectOption()
- {
- return false;
- }
-
- protected boolean hasWindowSizeOption()
- {
- return false;
- }
-
- protected boolean hasSingleLinkPerConnectionMode()
- {
- return false;
- }
-
- protected abstract boolean hasLinkDurableOption();
-
- protected abstract boolean hasLinkNameOption();
-
- protected abstract boolean hasResponseQueueOption();
-
- protected abstract boolean hasSizeOption();
-
- protected abstract boolean hasBlockOption();
-
- protected abstract boolean hasStdInOption();
-
- protected abstract boolean hasTxnOption();
-
- protected abstract boolean hasModeOption();
-
- protected abstract boolean hasCountOption();
-
- public String getHost()
- {
- return _host;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public int getCount()
- {
- return _count;
- }
-
- public boolean useStdIn()
- {
- return _useStdIn;
- }
-
- public boolean useTran()
- {
- return _useTran;
- }
-
- public AcknowledgeMode getMode()
- {
- return _mode;
- }
-
- public boolean isBlock()
- {
- return _block;
- }
-
- public String[] getArgs()
- {
- return _args;
- }
-
- public int getMessageSize()
- {
- return _messageSize;
- }
-
- public String getResponseQueue()
- {
- return _responseQueue;
- }
-
- public int getBatchSize()
- {
- return _batchSize;
- }
-
- public double getRollbackRatio()
- {
- return _rollbackRatio;
- }
-
- public String getLinkName()
- {
- return _linkName;
- }
-
- public boolean isDurableLink()
- {
- return _durableLink;
- }
-
- public boolean isUseMultipleConnections()
- {
- return _useMultipleConnections;
- }
-
- public void setUseMultipleConnections(boolean useMultipleConnections)
- {
- _useMultipleConnections = useMultipleConnections;
- }
-
- public String getSubject()
- {
- return _subject;
- }
-
- public void setSubject(String subject)
- {
- _subject = subject;
- }
-
- protected abstract void printUsage(final Options options);
-
- protected abstract void run();
-
-
- public Connection newConnection() throws ConnectionException
- {
- Container container = getContainerName() == null ? new Container() : new Container(getContainerName());
- return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container,
- _remoteHost == null ? getHost() : _remoteHost, _useSSL,
- 0)
- : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize,
- container, _remoteHost == null ? getHost() : _remoteHost, _useSSL,
- 0);
- }
-
- public String getContainerName()
- {
- return _containerName;
- }
-
- public int getWindowSize()
- {
- return _windowSize;
- }
-
- public void setWindowSize(int windowSize)
- {
- _windowSize = windowSize;
- }
-
- public String getFilter()
- {
- return _filter;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/pom.xml b/qpid/java/amqp-1-0-client/pom.xml
deleted file mode 100644
index 5e5e6a547a..0000000000
--- a/qpid/java/amqp-1-0-client/pom.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-java-build</artifactId>
- <version>0.32-SNAPSHOT</version>
- </parent>
-
- <artifactId>qpid-amqp-1-0-client</artifactId>
- <name>Qpid AMQP 1.0 Client</name>
- <description>AMQP 1.0 compliant client module</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-amqp-1-0-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>resources/</include>
- </includes>
- </resource>
- </resources>
- </build>
-
-</project>
diff --git a/qpid/java/amqp-1-0-client/resources/LICENSE b/qpid/java/amqp-1-0-client/resources/LICENSE
deleted file mode 100644
index de4b130f35..0000000000
--- a/qpid/java/amqp-1-0-client/resources/LICENSE
+++ /dev/null
@@ -1,204 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-
diff --git a/qpid/java/amqp-1-0-client/resources/NOTICE b/qpid/java/amqp-1-0-client/resources/NOTICE
deleted file mode 100644
index 8d1c3f3122..0000000000
--- a/qpid/java/amqp-1-0-client/resources/NOTICE
+++ /dev/null
@@ -1,5 +0,0 @@
-Apache Qpid
-Copyright 2006-2012 Apache Software Foundation
-This product includes software developed at
-Apache Software Foundation (http://www.apache.org/)
-
diff --git a/qpid/java/amqp-1-0-client/resources/README.txt b/qpid/java/amqp-1-0-client/resources/README.txt
deleted file mode 100644
index 35d25050fe..0000000000
--- a/qpid/java/amqp-1-0-client/resources/README.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-
-Documentation
---------------
-All of our user documentation can be accessed at:
-
-http://qpid.apache.org/documentation.html
-
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java
deleted file mode 100644
index 05d176bc35..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-public enum AcknowledgeMode
-{
- AMO,
- ALO,
- EO
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java
deleted file mode 100644
index 1f23d02e02..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-public class ChannelsExhaustedException extends ConnectionException
-{
- protected ChannelsExhaustedException(final String message)
- {
- super(message);
- }
-
- public ChannelsExhaustedException(Throwable cause)
- {
- super(cause);
- }
-
- ChannelsExhaustedException()
- {
-
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
deleted file mode 100644
index a4f9ac5a3a..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.security.NoSuchAlgorithmException;
-import java.security.Principal;
-import java.util.ServiceLoader;
-import java.util.concurrent.TimeoutException;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.Predicate;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class Connection implements ExceptionHandler
-{
- private static final int MAX_FRAME_SIZE = 65536;
-
- private String _address;
- private ConnectionEndpoint _conn;
- private int _sessionCount;
- private Runnable _connectionErrorTask;
- private Error _socketError;
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password, String remoteHostname) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,new Container());
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,MAX_FRAME_SIZE,container);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container, null);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname) throws ConnectionException
- {
- this(address,port,username,password,maxFrameSize,container,remoteHostname,false,-1);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl,-1);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl,-1);
- }
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final boolean ssl,
- final int channelMax) throws ConnectionException
- {
- this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl,
- channelMax);
- }
-
-
- public Connection(final String protocol,
- final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final SSLContext sslContext,
- final int channelMax) throws ConnectionException
- {
- this(protocol, address, port, username, password,container,remoteHost,sslContext,
- null, channelMax);
- }
-
- public Connection(final String protocol,
- final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final SSLContext sslContext,
- final SSLOptions sslOptions,
- final int channelMax) throws ConnectionException
- {
- this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,sslContext,
- sslOptions, channelMax);
- }
-
-
- public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname,
- boolean ssl,
- int channelMax) throws ConnectionException
- {
- this(ssl?"amqps":"amqp",address,port,username,password,maxFrameSize,container,
- remoteHostname,
- getSslContext(ssl),
- null,
- channelMax);
- }
-
- private static SSLContext getSslContext(final boolean ssl) throws ConnectionException
- {
- try
- {
- return ssl ? SSLContext.getDefault() : null;
- }
- catch (NoSuchAlgorithmException e)
- {
- throw new ConnectionException(e);
- }
- }
-
- public Connection(final String protocol,
- final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname,
- SSLContext sslContext,
- final SSLOptions sslOptions, int channelMax) throws ConnectionException
- {
-
- _address = address;
-
-
- Principal principal = username == null ? null : new Principal()
- {
-
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- if(channelMax >= 0)
- {
- _conn.setChannelMax((short)channelMax);
- }
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteHostname(remoteHostname);
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
- ConnectionHandler.BytesSource src;
-
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
- );
-
- _conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry())
- );
- }
-
- TransportProvider transportProvider = getTransportProvider(protocol);
-
- transportProvider.connect(_conn,address,port, sslContext, sslOptions, this);
-
-
- try
- {
- _conn.open();
- }
- catch(RuntimeException e)
- {
- transportProvider.close();
- }
-
- }
-
- private TransportProvider getTransportProvider(final String protocol) throws ConnectionException
- {
- TCPTransportProviderFactory tcpTransportProviderFactory = new TCPTransportProviderFactory();
- if(tcpTransportProviderFactory.getSupportedTransports().contains(protocol))
- {
- return tcpTransportProviderFactory.getProvider(protocol);
- }
-
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader);
-
- for(TransportProviderFactory tpf : providerFactories)
- {
- if(tpf.getSupportedTransports().contains(protocol))
- {
- return tpf.getProvider(protocol);
- }
- }
-
- throw new ConnectionException("Unknown protocol: " + protocol);
- }
-
- private Connection(ConnectionEndpoint endpoint)
- {
- _conn = endpoint;
- }
-
-
- public Session createSession() throws ConnectionException
- {
- checkNotClosed();
- Session session = new Session(this,String.valueOf(_sessionCount++));
- return session;
- }
-
- void checkNotClosed() throws ConnectionClosedException
- {
- if(getEndpoint().isClosed())
- {
- Error remoteError = getEndpoint().getRemoteError();
- if(remoteError == null)
- {
- remoteError = new Error();
- remoteError.setDescription("Connection closed for unknown reason");
-
- }
- throw new ConnectionClosedException(remoteError);
- }
- }
-
- public ConnectionEndpoint getEndpoint()
- {
- return _conn;
- }
-
- public void awaitOpen() throws TimeoutException, InterruptedException
- {
- getEndpoint().waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return getEndpoint().isOpen() || getEndpoint().isClosed();
- }
- });
-
- }
-
- public void close() throws ConnectionErrorException
- {
- _conn.close();
-
- try
- {
- _conn.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _conn.closedForInput();
- }
- });
- }
- catch (InterruptedException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Interrupted while waiting for connection closure");
- }
- catch (TimeoutException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Timed out while waiting for connection closure");
- }
- if(_conn.getRemoteError() != null)
- {
- throw new ConnectionErrorException(_conn.getRemoteError());
- }
-
- }
-
- /**
- * Set the connection error task that will be used as a callback for any socket read/write errors.
- *
- * @param connectionErrorTask connection error task
- */
- public void setConnectionErrorTask(Runnable connectionErrorTask)
- {
- _connectionErrorTask = connectionErrorTask;
- }
-
- /**
- * Return the connection error for any socket read/write error that has occurred
- *
- * @return connection error
- */
- public Error getConnectionError()
- {
- return _socketError;
- }
-
- @Override
- public void handleException(Exception exception)
- {
- Error socketError = new Error();
- socketError.setDescription(exception.getClass() + ": " + exception.getMessage());
- socketError.setCondition(ConnectionError.SOCKET_ERROR);
- _socketError = socketError;
- if(_connectionErrorTask != null)
- {
- Thread thread = new Thread(_connectionErrorTask);
- thread.start();
- }
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java
deleted file mode 100644
index a434a4aaaf..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-public class ConnectionClosedException extends ConnectionErrorException
-{
-
- public ConnectionClosedException(org.apache.qpid.amqp_1_0.type.transport.Error remoteError)
- {
- super(remoteError);
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
deleted file mode 100644
index 82f29ea4b1..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.ErrorCondition;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class ConnectionErrorException extends ConnectionException
-{
- protected final Error _remoteError;
-
- public ConnectionErrorException(ErrorCondition condition,final String description)
- {
- this(new Error(condition, description));
- }
-
- public ConnectionErrorException(Error remoteError)
- {
- super(remoteError.getDescription() == null ? remoteError.toString() : remoteError.getDescription());
- _remoteError = remoteError;
- }
-
- public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
- {
- return _remoteError;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
deleted file mode 100644
index 569c1f129d..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-public class ConnectionException extends Exception
-{
- protected ConnectionException(final String message)
- {
- super(message);
- }
-
- public ConnectionException(Throwable cause)
- {
- super(cause);
- }
-
- ConnectionException()
- {
-
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
deleted file mode 100644
index 45b00255f2..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class LinkDetachedException extends Exception
-{
- private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError;
-
- public LinkDetachedException(Error remoteError)
- {
- super();
- _remoteError = remoteError;
- }
-
- public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
- {
- return _remoteError;
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
deleted file mode 100644
index c4f9783c89..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
-import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations;
-import org.apache.qpid.amqp_1_0.type.messaging.Footer;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-public class Message
-{
-
- private static final Map<Class<? extends Section>, Collection<Class<? extends Section>>> VALID_NEXT_SECTIONS = new HashMap<>();
-
- static
- {
- VALID_NEXT_SECTIONS.put(null, Arrays.asList(Header.class,
- DeliveryAnnotations.class,
- MessageAnnotations.class,
- Properties.class,
- ApplicationProperties.class,
- AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
- VALID_NEXT_SECTIONS.put(Header.class, Arrays.asList(DeliveryAnnotations.class,
- MessageAnnotations.class,
- Properties.class,
- ApplicationProperties.class,
- AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
- VALID_NEXT_SECTIONS.put(DeliveryAnnotations.class, Arrays.asList(MessageAnnotations.class,
- Properties.class,
- ApplicationProperties.class,
- AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
- VALID_NEXT_SECTIONS.put(MessageAnnotations.class, Arrays.asList(Properties.class,
- ApplicationProperties.class,
- AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
- VALID_NEXT_SECTIONS.put(Properties.class, Arrays.asList(ApplicationProperties.class,
- AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
-
- VALID_NEXT_SECTIONS.put(ApplicationProperties.class, Arrays.asList(AmqpValue.class,
- AmqpSequence.class,
- Data.class));
-
- VALID_NEXT_SECTIONS.put(AmqpValue.class, Arrays.<Class<? extends Section>>asList(Footer.class, null));
-
- VALID_NEXT_SECTIONS.put(AmqpSequence.class, Arrays.asList(AmqpSequence.class,
- Footer.class, null));
-
- VALID_NEXT_SECTIONS.put(Data.class, Arrays.asList(Data.class, Footer.class, null));
-
- VALID_NEXT_SECTIONS.put(Footer.class, Collections.<Class<? extends Section>>singletonList(null));
-
-
- }
-
-
- private Binary _deliveryTag;
- private List<Section> _payload = new ArrayList<Section>();
- private Boolean _resume;
- private boolean _settled;
- private DeliveryState _deliveryState;
- private Receiver _receiver;
-
-
- public Message()
- {
- }
-
- public Message(Collection<Section> sections)
- {
- this(sections, true);
- }
-
- public Message(Collection<Section> sections, boolean validate)
- {
- _payload.addAll(validate ? validateOrReorder(sections) : sections);
- }
-
- public Message(Section section)
- {
- this(Collections.singletonList(section));
- }
-
- public Message(String message)
- {
- this(new AmqpValue(message));
- }
-
-
- private static Collection<Section> validateOrReorder(final Collection<Section> providedSections)
- {
- Collection<Section> validatedSections;
- if(providedSections == null)
- {
- validatedSections = Collections.emptyList();
- }
- else if(isValidOrder(providedSections))
- {
- validatedSections = providedSections;
- }
- else
- {
- validatedSections = reorderSections(providedSections);
- }
- return validatedSections;
- }
-
- private static Collection<Section> reorderSections(final Collection<Section> providedSections)
- {
- Collection<Section> validSections = new ArrayList<>();
- List<Section> originalSection = new ArrayList<>(providedSections);
- validSections.addAll(getAndRemoveSections(Header.class, originalSection, false));
- validSections.addAll(getAndRemoveSections(DeliveryAnnotations.class, originalSection, false));
- validSections.addAll(getAndRemoveSections(MessageAnnotations.class, originalSection, false));
- validSections.addAll(getAndRemoveSections(Properties.class, originalSection, false));
- validSections.addAll(getAndRemoveSections(ApplicationProperties.class, originalSection, false));
-
- final List<AmqpValue> valueSections = getAndRemoveSections(AmqpValue.class, originalSection, false);
- final List<AmqpSequence> sequenceSections = getAndRemoveSections(AmqpSequence.class, originalSection, true);
- final List<Data> dataSections = getAndRemoveSections(Data.class, originalSection, true);
-
- if(valueSections.isEmpty() && sequenceSections.isEmpty() && dataSections.isEmpty())
- {
- throw new IllegalArgumentException("Message must contain one of Data, AmqpValue or AmqpSequence");
- }
- if((!valueSections.isEmpty() && (!sequenceSections.isEmpty() || !dataSections.isEmpty()))
- || (!sequenceSections.isEmpty() && !dataSections.isEmpty()))
- {
- throw new IllegalArgumentException("Only one type of content Data, AmqpValue or AmqpSequence can be used");
- }
- validSections.addAll(valueSections);
- validSections.addAll(sequenceSections);
- validSections.addAll(dataSections);
-
- validSections.addAll(getAndRemoveSections(Footer.class, originalSection, false));
-
- if(!originalSection.isEmpty())
- {
- throw new IllegalArgumentException("Invalid section type: " + originalSection.get(0).getClass().getName());
- }
- return validSections;
- }
-
- private static <T extends Section> List<T> getAndRemoveSections(Class<T> clazz,
- List<Section> sections,
- boolean allowMultiple)
- {
- List<T> desiredSections = new ArrayList<>();
- ListIterator<Section> iterator = sections.listIterator();
- while(iterator.hasNext())
- {
- Section s = iterator.next();
- if(s.getClass() == clazz)
- {
- desiredSections.add((T)s);
- iterator.remove();
- }
- }
- if(desiredSections.size() > 1 && !allowMultiple)
- {
- throw new IllegalArgumentException("Multiple " + clazz.getSimpleName() + " sections are not allowed");
- }
- return desiredSections;
- }
-
- private static boolean isValidOrder(final Collection<Section> providedSections)
- {
- Class<? extends Section> previousSection = null;
- final Iterator<? extends Section> it = providedSections.iterator();
- while(it.hasNext())
- {
- Collection<Class<? extends Section>> validSections = VALID_NEXT_SECTIONS.get(previousSection);
- Section next = it.next();
- Class<? extends Section> sectionClass = next.getClass();
- if(validSections == null || !validSections.contains(sectionClass))
- {
- return false;
- }
- else
- {
- previousSection = sectionClass;
- }
- }
- Collection<Class<? extends Section>> validSections = VALID_NEXT_SECTIONS.get(previousSection);
- return validSections != null && validSections.contains(null);
- }
-
-
-
- public Binary getDeliveryTag()
- {
- return _deliveryTag;
- }
-
- public void setDeliveryTag(Binary deliveryTag)
- {
- _deliveryTag = deliveryTag;
- }
-
- public List<Section> getPayload()
- {
- return Collections.unmodifiableList(_payload);
- }
-
- private <T extends Section> T getSection(Class<T> clazz)
- {
- for(Section s : _payload)
- {
- if(clazz.isAssignableFrom(s.getClass()))
- {
- return (T) s;
- }
- }
- return null;
- }
-
- public ApplicationProperties getApplicationProperties()
- {
- return getSection(ApplicationProperties.class);
- }
-
- public Properties getProperties()
- {
- return getSection(Properties.class);
- }
-
- public Header getHeader()
- {
- return getSection(Header.class);
- }
-
-
- public void setResume(final Boolean resume)
- {
- _resume = resume;
- }
-
- public boolean isResume()
- {
- return Boolean.TRUE.equals(_resume);
- }
-
- public void setDeliveryState(DeliveryState state)
- {
- _deliveryState = state;
- }
-
- public DeliveryState getDeliveryState()
- {
- return _deliveryState;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
-
- public boolean getSettled()
- {
- return _settled;
- }
-
- public void setReceiver(final Receiver receiver)
- {
- _receiver = receiver;
- }
-
- public Receiver getReceiver()
- {
- return _receiver;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
deleted file mode 100644
index 5d4374fec5..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Predicate;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.ErrorCondition;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-public class Receiver implements DeliveryStateHandler
-{
- private static final ErrorCondition UNKNOWN_ERROR_CONDITION = new ErrorCondition()
- {
- @Override
- public Symbol getValue()
- {
- return Symbol.valueOf("Unknown");
- }
-
- @Override
- public String toString()
- {
- return getValue().toString();
- }
- };
- private ReceivingLinkEndpoint _endpoint;
- private int _id;
- private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
- private Session _session;
-
- private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
- private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
- private MessageArrivalListener _messageArrivalListener;
- private org.apache.qpid.amqp_1_0.type.transport.Error _error;
- private Runnable _remoteErrorTask;
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode) throws ConnectionErrorException
- {
- this(session, linkName, target, source, ackMode, false);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- boolean isDurable) throws ConnectionErrorException
- {
- this(session,linkName,target,source,ackMode,isDurable,null);
- }
-
- public Receiver(final Session session,
- final String linkName,
- final Target target,
- final Source source,
- final AcknowledgeMode ackMode,
- final boolean isDurable,
- final Map<Binary,Outcome> unsettled) throws ConnectionErrorException
- {
-
- session.getConnection().checkNotClosed();
- _session = session;
- if(isDurable)
- {
- source.setDurable(TerminusDurability.UNSETTLED_STATE);
- source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- else if(source != null)
- {
- source.setDurable(TerminusDurability.NONE);
- source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- }
- _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
- UnsignedInteger.ZERO);
-
- _endpoint.setDeliveryStateHandler(this);
-
- switch(ackMode)
- {
- case ALO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case EO:
- _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
- {
- @Override public void messageTransfer(final Transfer xfr)
- {
- _prefetchQueue.add(xfr);
- postPrefetchAction();
- }
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(detach.getError()!=null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
-
- _endpoint.setLocalUnsettled(unsettled);
- _endpoint.attach();
-
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
-
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.isAttached() || _endpoint.isDetached();
- }
- });
- }
- catch (TimeoutException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for attach");
- }
- catch (InterruptedException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for attach");
- }
-
- if(_endpoint.getSource() == null)
- {
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.isDetached();
- }
- });
- }
- catch (TimeoutException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for detach following failed attach");
- }
- catch (InterruptedException e)
- {
- throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for detach following failed attach");
- }
-
- Error error = getError() == null
- ? new Error(UNKNOWN_ERROR_CONDITION, "Unknown")
- : getError();
-
-
- ErrorCondition condition = error.getCondition() == null ? UNKNOWN_ERROR_CONDITION : error.getCondition();
-
- throw new ConnectionErrorException(condition,
- error.getDescription() == null
- ? "AMQP error: '" + condition.toString()
- + "' when attempting to create a receiver"
- + (source != null ? " from: '" + source.getAddress() +"'" : "")
- : error.getDescription());
- }
- }
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- Thread thread = new Thread(_remoteErrorTask);
- thread.start();
- }
- }
-
- private void postPrefetchAction()
- {
- if(_messageArrivalListener != null)
- {
- _messageArrivalListener.messageArrived(this);
- }
- }
-
- public void setCredit(UnsignedInteger credit, boolean window)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setCreditWindow(window);
-
- }
-
-
- public String getAddress()
- {
- return ((Source)_endpoint.getSource()).getAddress();
- }
-
- public Map getFilter()
- {
- return ((Source)_endpoint.getSource()).getFilter();
- }
-
- public Message receive()
- {
- return receive(-1L);
- }
-
- public Message receive(boolean wait)
- {
- return receive(wait ? -1L : 0L);
- }
-
- // 0 means no wait, -1 wait forever
- public Message receive(long wait)
- {
- Message m = null;
- Transfer xfr;
- long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
-
- while((xfr = receiveFromPrefetch(wait)) != null )
- {
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- Binary deliveryTag = xfr.getDeliveryTag();
- Boolean resume = xfr.getResume();
-
- List<Section> sections = new ArrayList<Section>();
- List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
- int totalSize = 0;
-
- boolean hasMore;
- do
- {
- hasMore = Boolean.TRUE.equals(xfr.getMore());
-
- ByteBuffer buf = xfr.getPayload();
-
- if(buf != null)
- {
-
- totalSize += buf.remaining();
-
- payloads.add(buf);
- }
- if(hasMore)
- {
- xfr = receiveFromPrefetch(-1l);
- if(xfr== null)
- {
- // TODO - this is wrong!!!!
- System.out.println("eeek");
- }
- }
- }
- while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
-
- if(!Boolean.TRUE.equals(xfr.getAborted()))
- {
- ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
- for(ByteBuffer payload : payloads)
- {
- allPayload.put(payload);
- }
- allPayload.flip();
- SectionDecoder decoder = _session.getSectionDecoder();
-
- try
- {
- sections = decoder.parseAll(allPayload);
- }
- catch (AmqpErrorException e)
- {
- // todo - throw a sensible error
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- m = new Message(sections, false);
- m.setDeliveryTag(deliveryTag);
- m.setResume(resume);
- m.setReceiver(this);
- break;
- }
- }
-
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <=0L)
- {
- break;
- }
- }
- }
-
-
- return m;
-
- }
-
- private Transfer receiveFromPrefetch(long wait)
- {
- long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- Transfer xfr;
- while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
- && wait != 0)
- {
- try
- {
- if(wait>0L)
- {
- lock.wait(wait);
- }
- else if(wait<0L)
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- return null;
- }
- if(wait > 0L)
- {
- wait = endTime - System.currentTimeMillis();
- if(wait <= 0L)
- {
- break;
- }
- }
-
- }
- if(xfr != null)
- {
- _prefetchQueue.poll();
-
- }
-
- return xfr;
- }
-
- }
-
-
- public void release(final Message m)
- {
- release(m.getDeliveryTag());
- }
-
- public void release(Binary deliveryTag)
- {
- update(new Released(), deliveryTag, null, null);
- }
-
-
- public void modified(Binary tag)
- {
- final Modified outcome = new Modified();
- outcome.setDeliveryFailed(true);
-
- update(outcome, tag, null, null);
- }
-
- public void acknowledge(final Message m)
- {
- acknowledge(m.getDeliveryTag());
- }
-
- public void acknowledge(final Message m, SettledAction a)
- {
- acknowledge(m.getDeliveryTag(), a);
- }
-
-
- public void acknowledge(final Message m, Transaction txn)
- {
- acknowledge(m.getDeliveryTag(), txn);
- }
-
-
- public void acknowledge(final Binary deliveryTag)
- {
- acknowledge(deliveryTag, null, null);
- }
-
-
- public void acknowledge(final Binary deliveryTag, SettledAction a)
- {
- acknowledge(deliveryTag, null, a);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn)
- {
- acknowledge(deliveryTag, txn, null);
- }
-
- public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- update(new Accepted(), deliveryTag, txn, action);
- }
-
- public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
- {
-
- DeliveryState state;
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
-
- _endpoint.updateDisposition(deliveryTag,state, settled);
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public void acknowledgeAll(Message m)
- {
- acknowledgeAll(m.getDeliveryTag());
- }
-
- public void acknowledgeAll(Binary deliveryTag)
- {
- acknowledgeAll(deliveryTag, null, null);
- }
-
- public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- updateAll(new Accepted(), deliveryTag, txn, action);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag)
- {
- updateAll(outcome, deliveryTag, null, null);
- }
-
- public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
- {
- DeliveryState state;
-
- if(txn != null)
- {
- TransactionalState txnState = new TransactionalState();
- txnState.setOutcome(outcome);
- txnState.setTxnId(txn.getTxnId());
- state = txnState;
- }
- else
- {
- state = (DeliveryState) outcome;
- }
- boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
-
- if(!(settled || action == null))
- {
- _unsettledMap.put(deliveryTag, action);
- }
- _endpoint.updateAllDisposition(deliveryTag, state, settled);
- }
-
-
-
- public void close()
- {
- _endpoint.setTarget(null);
- _endpoint.close();
- Message msg;
- while((msg = receive(0l)) != null)
- {
- release(msg);
- }
- _session.removeReceiver(this);
-
- }
-
-
- public void detach()
- {
- _endpoint.setTarget(null);
- _endpoint.detach();
- Message msg;
- while((msg = receive(0l)) != null)
- {
- release(msg);
- }
-
- }
-
- public void drain()
- {
- _endpoint.drain();
- }
-
- /**
- * Waits for the receiver to drain or a message to be available to be received.
- * @return true if the receiver has been drained.
- */
- public boolean drainWait()
- {
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- try
- {
- while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() )
- {
- lock.wait();
- }
- }
- catch (InterruptedException e)
- {
- }
- }
- return _prefetchQueue.peek()==null && _endpoint.isDrained();
- }
-
- /**
- * Clears the receiver drain so that message delivery can resume.
- */
- public void clearDrain()
- {
- _endpoint.clearDrain();
- }
-
- public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
- {
- _endpoint.setLinkCredit(credit);
- _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
- _endpoint.setCreditWindow(false);
-
- }
-
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- SettledAction action = _unsettledMap.remove(deliveryTag);
- if(action != null)
- {
- action.onSettled(deliveryTag);
- }
- }
- }
-
- public Map<Binary, Outcome> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
-
- public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
- {
- synchronized(_endpoint.getLock())
- {
- _messageArrivalListener = messageArrivalListener;
- int prefetchSize = _prefetchQueue.size();
- for(int i = 0; i < prefetchSize; i++)
- {
- postPrefetchAction();
- }
- }
- }
-
- public Session getSession()
- {
- return _session;
- }
-
- public org.apache.qpid.amqp_1_0.type.Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public static interface SettledAction
- {
- public void onSettled(Binary deliveryTag);
- }
-
-
- public interface MessageArrivalListener
- {
- void messageArrived(Receiver receiver);
- }
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java
deleted file mode 100644
index 1558b2043b..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class SSLOptions
-{
- private final List<String> _enabledProtocols;
- private final List<String> _disabledProtocols;
-
- public SSLOptions(String enabledProtocols, String disabledProtocols)
- {
- if(enabledProtocols == null)
- {
- enabledProtocols = System.getProperty("qpid.ssl.enabledProtocols");
- }
-
- if(disabledProtocols == null)
- {
- disabledProtocols = System.getProperty("qpid.ssl.disabledProtocols", SSLUtil.SSLV3_PROTOCOL);
- }
-
- if(enabledProtocols == null)
- {
- _enabledProtocols = null;
- }
- else
- {
- _enabledProtocols = Collections.unmodifiableList(Arrays.asList(enabledProtocols.split(",")));
- }
-
- if(disabledProtocols == null)
- {
- _disabledProtocols = null;
- }
- else
- {
- _disabledProtocols = Collections.unmodifiableList(Arrays.asList(disabledProtocols.split(",")));
- }
- }
-
- public SSLOptions(final List<String> enabledProtocols, final List<String> disabledProtocols)
- {
- this._enabledProtocols = enabledProtocols == null ? Collections.<String>emptyList() : Collections.unmodifiableList(new ArrayList<>(enabledProtocols));
- this._disabledProtocols = disabledProtocols == null ? Collections.<String>emptyList() : Collections.unmodifiableList(new ArrayList<>(disabledProtocols));
- }
-
- public List<String> getEnabledProtocols()
- {
- return _enabledProtocols;
- }
-
- public List<String> getDisabledProtocols()
- {
- return _disabledProtocols;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java
deleted file mode 100644
index 7bcf796fa9..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.Socket;
-import java.security.GeneralSecurityException;
-import java.security.KeyStore;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.Principal;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.logging.Logger;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509ExtendedKeyManager;
-
-public class SSLUtil
-{
- public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
- public static final String SSLV3_PROTOCOL = "SSLv3";
-
-
- private static final Logger LOGGER = Logger.getLogger(SSLUtil.class.getName());
-
-
- public static SSLContext buildSslContext(final String certAlias,
- final String keyStorePath,
- final String keyStoreType,
- final String keyStorePassword,
- final String keyManagerFactoryAlgorithm,
- final String trustStorePath,
- final String trustStorePassword,
- final String trustStoreType,
- final String trustManagerFactoryAlgorithm,
- final String sslProtocol,
- final String sslProvider) throws GeneralSecurityException, IOException
- {
-
-
- SSLContext sslContext = getSslContext(sslProtocol, sslProvider);
-
- final TrustManager[] trustManagers;
- final KeyManager[] keyManagers;
-
- if (trustStorePath != null)
- {
- final KeyStore ts = getInitializedKeyStore(trustStorePath, trustStorePassword, trustStoreType);
- final TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustManagerFactoryAlgorithm);
-
- tmf.init(ts);
-
- trustManagers = tmf.getTrustManagers();
- }
- else
- {
- trustManagers = null;
- }
-
- if (keyStorePath != null)
- {
- if (certAlias != null)
- {
- keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
- certAlias, keyStorePath, keyStoreType, keyStorePassword,
- keyManagerFactoryAlgorithm) };
- }
- else
- {
- final KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType);
-
- char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
- // Set up key manager factory to use our key store
- final KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
- kmf.init(ks, keyStoreCharPassword);
- keyManagers = kmf.getKeyManagers();
- }
- }
- else
- {
- keyManagers = null;
- }
-
-
- sslContext.init(keyManagers, trustManagers, null);
-
- return sslContext;
- }
-
- private static SSLContext getSslContext(final String sslProtocol,
- final String sslProvider) throws NoSuchAlgorithmException
- {
-
- final String sslProviderName = sslProvider != null ? sslProvider : System.getProperty("qpid.ssl.contextProvider");
- final String sslProtocolName = sslProtocol != null ? sslProtocol : System.getProperty("qpid.ssl.contextProtocol", TRANSPORT_LAYER_SECURITY_CODE);
-
- SSLContext sslContext = null;
- if(sslProviderName != null && sslProtocolName != null)
- {
- try
- {
- sslContext = SSLContext.getInstance(sslProtocolName, sslProviderName);
- }
- catch(NoSuchProviderException e)
- {
- LOGGER.info("Unknown SSL Context Provider '"+ sslProviderName + "' will use the default");
- }
- catch (NoSuchAlgorithmException e)
- {
- LOGGER.info("Unknown SSL protocol '" + sslProtocolName
- + "' when using the provider '" + sslProviderName + "' will use the default provider");
- }
- }
- if(sslContext == null && sslProtocolName != null)
- {
- try
- {
- sslContext = SSLContext.getInstance(sslProtocolName);
- }
- catch(NoSuchAlgorithmException e)
- {
- LOGGER.info("Unknown SSL protocol '" + sslProtocolName +
- "' will use '"+TRANSPORT_LAYER_SECURITY_CODE+"'");
- }
- }
- if(sslContext == null)
- {
- sslContext = SSLContext.getInstance(TRANSPORT_LAYER_SECURITY_CODE);
- }
- return sslContext;
- }
-
- public static X509Certificate[] getClientCertificates(final String alias,
- final String keyStorePath,
- final String keyStorePassword,
- final String keyStoreType,
- final String keyManagerFactoryAlgorithm)
- throws GeneralSecurityException, IOException
- {
- return (new QpidClientX509KeyManager(alias,keyStorePath,keyStoreType,keyStorePassword,keyManagerFactoryAlgorithm)).getCertificateChain(alias);
- }
-
- public static KeyStore getInitializedKeyStore(String storePath, String storePassword, String keyStoreType) throws GeneralSecurityException, IOException
- {
- KeyStore ks = KeyStore.getInstance(keyStoreType);
- InputStream in = null;
- try
- {
- File f = new File(storePath);
- if (f.exists())
- {
- in = new FileInputStream(f);
- }
- else
- {
- in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath);
- }
- if (in == null && !"PKCS11".equalsIgnoreCase(keyStoreType)) // PKCS11 will not require an explicit path
- {
- throw new IOException("Unable to load keystore resource: " + storePath);
- }
-
- char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray();
-
- ks.load(in, storeCharPassword);
- }
- finally
- {
- if (in != null)
- {
- //noinspection EmptyCatchBlock
- try
- {
- in.close();
- }
- catch (IOException ignored)
- {
- }
- }
- }
- return ks;
- }
-
- public static class QpidClientX509KeyManager extends X509ExtendedKeyManager
- {
-
- private X509ExtendedKeyManager delegate;
- private String alias;
-
- public QpidClientX509KeyManager(String alias, String keyStorePath, String keyStoreType,
- String keyStorePassword, String keyManagerFactoryAlgorithmName) throws
- GeneralSecurityException,
- IOException
- {
- this.alias = alias;
- KeyStore ks = getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType);
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName);
- kmf.init(ks, keyStorePassword.toCharArray());
- this.delegate = (X509ExtendedKeyManager) kmf.getKeyManagers()[0];
- }
-
- public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket)
- {
- return alias;
- }
-
- public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket)
- {
- return delegate.chooseServerAlias(keyType, issuers, socket);
- }
-
- public X509Certificate[] getCertificateChain(String alias)
- {
- return delegate.getCertificateChain(alias);
- }
-
- public String[] getClientAliases(String keyType, Principal[] issuers)
- {
- return new String[]{alias};
- }
-
- public PrivateKey getPrivateKey(String alias)
- {
- return delegate.getPrivateKey(alias);
- }
-
- public String[] getServerAliases(String keyType, Principal[] issuers)
- {
- return delegate.getServerAliases(keyType, issuers);
- }
-
- public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine)
- {
- return alias;
- }
-
- public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine)
- {
- return delegate.chooseEngineServerAlias(keyType, issuers, engine);
- }
- }
-
- public static void removeSSLv3Support(final SSLSocket socket)
- {
- List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols());
- if(enabledProtocols.contains(SSLV3_PROTOCOL))
- {
- List<String> allowedProtocols = new ArrayList<>(enabledProtocols);
- allowedProtocols.remove(SSLV3_PROTOCOL);
- socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()]));
- }
- }
-
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
deleted file mode 100644
index 2b76344085..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Predicate;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.Source;
-import org.apache.qpid.amqp_1_0.type.Target;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-public class Sender implements DeliveryStateHandler
-{
- private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l;
- private static final long DEFAULT_CREDIT_TIMEOUT = 30000l;
-
- private SendingLinkEndpoint _endpoint;
- private int _id;
- private Session _session;
- private int _windowSize;
- private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
- private boolean _closed;
- private Error _error;
- private Runnable _remoteErrorTask;
- private Outcome _defaultOutcome;
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, false);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- boolean synchronous)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
- }
-
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window) throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, AcknowledgeMode.ALO);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, target, source, window, mode, null);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
- }
-
- public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
- int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
- this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
- }
-
- protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source)
- {
-
- }
-
- protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target)
- {
-
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
- source.setAddress(sourceAddr);
- return source;
- }
-
- private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
- {
- org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
- target.setAddress(targetAddr);
- if(isDurable)
- {
- target.setDurable(TerminusDurability.UNSETTLED_STATE);
- target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
- }
- return target;
- }
-
- public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
- int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
- throws SenderCreationException, ConnectionClosedException
- {
-
- _session = session;
- _windowSize = window;
- session.getConnection().checkNotClosed();
- configureSource(source);
- configureTarget(target);
- _endpoint = session.createSendingLinkEndpoint(linkName, target, source, mode, unsettled, this);
-
- synchronized(_endpoint.getLock())
- {
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.isAttached() || _endpoint.isDetached();
- }
- });
- }
- catch (TimeoutException e)
- {
- throw new SenderCreationException(e);
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
-
- if (session.getEndpoint().isEnded())
- {
- throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress());
- }
- if(_endpoint.getTarget()== null)
- {
- throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
- }
- }
-
- _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
- {
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _error = detach.getError();
- if(_error != null)
- {
- remoteError();
- }
- super.remoteDetached(endpoint, detach);
- }
- });
-
- _defaultOutcome = source.getDefaultOutcome();
- if(_defaultOutcome == null)
- {
- if(source.getOutcomes() == null || source.getOutcomes().length == 0)
- {
- _defaultOutcome = new Accepted();
- }
- else if(source.getOutcomes().length == 1)
- {
-
- final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession()
- .getConnection()
- .getDescribedTypeRegistry();
-
- DescribedTypeConstructor constructor = describedTypeRegistry
- .getConstructor(source.getOutcomes()[0]);
- if(constructor != null)
- {
- Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST);
- if(impliedOutcome instanceof Outcome)
- {
- _defaultOutcome = (Outcome) impliedOutcome;
- }
- }
-
- }
- }
- }
-
- public Source getSource()
- {
- return _endpoint.getSource();
- }
-
- public Target getTarget()
- {
- return _endpoint.getTarget();
- }
-
- public void send(Message message) throws LinkDetachedException, TimeoutException
- {
- send(message, null, null);
- }
-
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException
- {
- send(message, null, action);
- }
-
- public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException
- {
- send(message, txn, null);
- }
-
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException
- {
-
- List<Section> sections = message.getPayload();
-
- Transfer xfr = new Transfer();
-
- if(sections != null && !sections.isEmpty())
- {
- SectionEncoder encoder = _session.getSectionEncoder();
- encoder.reset();
-
- int sectionNumber = 0;
- for(Section section : sections)
- {
- encoder.encodeObject(section);
- }
-
-
- Binary encoding = encoder.getEncoding();
- ByteBuffer payload = encoding.asByteBuffer();
- xfr.setPayload(payload);
- }
- if(message.getDeliveryTag() == null)
- {
- message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
- }
- if(message.isResume())
- {
- xfr.setResume(Boolean.TRUE);
- }
- if(message.getDeliveryState() != null)
- {
- xfr.setState(message.getDeliveryState());
- }
-
- xfr.setDeliveryTag(message.getDeliveryTag());
- //xfr.setSettled(_windowSize ==0);
- if(txn != null)
- {
- xfr.setSettled(false);
- TransactionalState deliveryState = new TransactionalState();
- deliveryState.setTxnId(txn.getTxnId());
- xfr.setState(deliveryState);
- }
- else
- {
- xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
- }
- final Object lock = _endpoint.getLock();
-
- synchronized(lock)
- {
-
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.hasCreditToSend() || _endpoint.isDetached();
- }
- }, getCreditTimeout());
- }
- catch (InterruptedException e)
- {
- throw new TimeoutException("Interrupted while waiting for credit");
- }
-
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- if(action != null)
- {
- _outcomeActions.put(message.getDeliveryTag(), action);
- }
- _endpoint.transfer(xfr);
- }
-
- if(_windowSize != 0)
- {
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.getUnsettledCount() < _windowSize;
- }
- }, getUnsettledTimeout());
- }
- catch (InterruptedException e)
- {
- throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending");
- }
-
- }
-
-
- }
-
- private long getCreditTimeout()
- {
- return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout();
- }
-
- public void close() throws SenderClosingException
- {
- boolean unsettledDeliveries = false;
-
- if(_windowSize != 0)
- {
- long timeout = getUnsettledTimeout();
-
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.getUnsettledCount() == 0;
- }
- }, timeout);
- }
- catch (InterruptedException e)
- {
- unsettledDeliveries = true;
- }
- catch (TimeoutException e)
- {
- unsettledDeliveries = true;
- }
-
- }
- _session.removeSender(this);
- _endpoint.setSource(null);
- _endpoint.close();
- _closed = true;
-
- try
- {
- _endpoint.waitUntil(new Predicate()
- {
- @Override
- public boolean isSatisfied()
- {
- return _endpoint.isDetached();
- }
- });
- }
- catch (TimeoutException e)
- {
- throw new SenderClosingException("Timed out attempting to detach link", e);
- }
- catch (InterruptedException e)
- {
- throw new SenderClosingException("Interrupted while attempting to detach link", e);
- }
- if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0)
- {
- throw new SenderClosingException("Some messages may not have been received by the recipient");
- }
- }
-
- private long getUnsettledTimeout()
- {
- long timeout = _endpoint.getSyncTimeout();
-
- // give a generous timeout where there are unsettled messages
- if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER)
- {
- timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER;
- }
- return timeout;
- }
-
- public boolean isClosed()
- {
- return _closed;
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- OutcomeAction action;
- if(state instanceof Outcome)
- {
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
-
- final Outcome outcome = (Outcome) state;
- action.onOutcome(deliveryTag, (outcome == null && settled) ? _defaultOutcome : outcome);
- }
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
- }
- else if(state instanceof TransactionalState)
- {
- if((action = _outcomeActions.remove(deliveryTag)) != null)
- {
- final Outcome outcome = ((TransactionalState) state).getOutcome();
- action.onOutcome(deliveryTag, outcome == null ? _defaultOutcome : outcome);
- }
-
- }
- else if(state == null && settled && (action = _outcomeActions.remove(deliveryTag)) != null)
- {
- action.onOutcome(deliveryTag, _defaultOutcome);
- }
- }
-
- public SendingLinkEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public Map<Binary, DeliveryState> getRemoteUnsettled()
- {
- return _endpoint.getInitialUnsettledMap();
- }
-
- public Session getSession()
- {
- return _session;
- }
-
-
- private void remoteError()
- {
- if(_remoteErrorTask != null)
- {
- Thread thread = new Thread(_remoteErrorTask);
- thread.start();
- }
- }
-
-
- public void setRemoteErrorListener(Runnable listener)
- {
- _remoteErrorTask = listener;
- }
-
- public Error getError()
- {
- return _error;
- }
-
- public class SenderCreationException extends Exception
- {
- public SenderCreationException(Throwable e)
- {
- super(e);
- }
-
- public SenderCreationException(String e)
- {
- super(e);
-
- }
- }
-
- public class SenderClosingException extends Exception
- {
- public SenderClosingException(final String message, final Throwable cause)
- {
- super(message, cause);
- }
-
- public SenderClosingException(Throwable e)
- {
- super(e);
- }
-
- public SenderClosingException(final String message)
- {
- super(message);
- }
- }
-
- public static interface OutcomeAction
- {
- public void onOutcome(Binary deliveryTag, Outcome outcome);
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
deleted file mode 100644
index 2c3857a689..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
+++ /dev/null
@@ -1,462 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SessionState;
-import org.apache.qpid.amqp_1_0.type.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class Session
-{
- private SessionEndpoint _endpoint;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
- private SectionEncoder _sectionEncoder;
- private SectionDecoder _sectionDecoder;
- private TransactionController _sessionLocalTC;
- private Connection _connection;
-
- public Session(final Connection connection, String name) throws ChannelsExhaustedException
- {
- _connection = connection;
- _endpoint = connection.getEndpoint().createSession(name);
- if(_endpoint == null)
- {
- throw new ChannelsExhaustedException("Cannot create session as all channels are in use");
- }
- _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
- }
-
-
- public Sender createSender(final String targetName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false);
-
- }
-
-
- public Sender createSender(final String targetName, final SourceConfigurator configurator)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- final String sourceName = UUID.randomUUID().toString();
- return createSender(sourceName, targetName, configurator);
- }
-
- public Sender createSender(final String sourceName, final String targetName, final SourceConfigurator configurator)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
-
- return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false)
- {
- @Override
- protected void configureSource(final Source source)
- {
- configurator.configureSource(source);
- }
- };
-
- }
-
- public Sender createSender(final String targetName, int window)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- final String sourceName = UUID.randomUUID().toString();
- return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
-
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return createSender(targetName, window, mode, linkName, false, null);
- }
-
- public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws Sender.SenderCreationException, ConnectionClosedException
- {
- return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
- targetName, null, window, mode, isDurable, unsettled);
-
- }
-
- public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName,
- final Target target,
- final Source source,
- AcknowledgeMode mode,
- Map<Binary, Outcome> unsettled,
- final DeliveryStateHandler deliveryStateHandler)
- {
- SessionEndpoint endpoint = this.getEndpoint();
- synchronized(endpoint.getLock())
- {
- SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source, target,
- unsettled, deliveryStateHandler);
-
- switch(mode)
- {
- case ALO:
- link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- link.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- break;
- case AMO:
- link.setSendingSettlementMode(SenderSettleMode.SETTLED);
- break;
- case EO:
- link.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- link.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
- break;
-
- }
-
- link.attach();
- return link;
- }
- }
-
- public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, UUID.randomUUID().toString(), null, AcknowledgeMode.ALO);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(queue, UUID.randomUUID().toString(), null, mode);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable);
- }
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, (DistributionMode) null, mode, linkName, isDurable, filters, unsettled);
- }
-
- public Receiver createReceiver(final String queue, String targetName, final AcknowledgeMode mode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, targetName, null, mode, linkName, isDurable, filters, unsettled);
- }
-
-
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable, unsettled);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr,
- final String targetAddr,
- DistributionMode mode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, targetAddr, mode, AcknowledgeMode.ALO);
- }
-
-
- private synchronized Receiver createReceiver(final String sourceAddr,
- final String targetAddr,
- DistributionMode mode,
- final AcknowledgeMode ackMode)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, targetAddr, mode, ackMode, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr,
- final String targetAddr,
- DistributionMode mode,
- final AcknowledgeMode ackMode,
- String linkName)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, false);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr,
- final String targetAddr,
- DistributionMode mode,
- final AcknowledgeMode ackMode,
- String linkName,
- boolean isDurable)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, isDurable, null);
- }
-
- private synchronized Receiver createReceiver(final String sourceAddr,
- final String targetAddr,
- DistributionMode mode,
- final AcknowledgeMode ackMode,
- String linkName,
- boolean isDurable,
- Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
- }
-
- public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, UUID.randomUUID().toString(), mode, ackMode, linkName, isDurable, filters, unsettled);
- }
-
- public synchronized Receiver createReceiver(final String sourceAddr, String targetAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
- throws ConnectionErrorException
- {
-
- final Target target = new Target();
- target.setAddress(targetAddr);
- final Source source = new Source();
- source.setAddress(sourceAddr);
- source.setDistributionMode(mode);
- source.setFilter(filters);
-
- if(linkName == null)
- {
- linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
- }
-
- final Receiver receiver =
- new Receiver(this, linkName,
- target, source, ackMode, isDurable, unsettled);
- _receivers.add(receiver);
-
- return receiver;
-
- }
-
- public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.COPY);
- }
-
- public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE);
- }
-
- public synchronized Receiver createMovingReceiver(final String sourceAddr, final String targetAddr) throws ConnectionErrorException
- {
- return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE);
- }
-
- public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException
- {
- Source source = new Source();
- source.setDynamic(true);
-
- final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
- source, AcknowledgeMode.ALO);
- _receivers.add(receiver);
- return receiver;
- }
-
- public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException
- {
- Target target = new Target();
- target.setDynamic(true);
-
- final Sender sender;
- sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
- new Source(), 0, AcknowledgeMode.ALO);
- _senders.add(sender);
- return sender;
- }
-
-
-
- public SessionEndpoint getEndpoint()
- {
- return _endpoint;
- }
-
- public synchronized void close()
- {
- try
- {
- for(Sender sender : new ArrayList<Sender>(_senders))
- {
- sender.close();
- }
- for(Receiver receiver : new ArrayList<Receiver>(_receivers))
- {
- receiver.detach();
- }
- if(_sessionLocalTC != null)
- {
- _sessionLocalTC.close();
- }
- _endpoint.end();
- }
- catch (Sender.SenderClosingException e)
- {
-// TODO
- e.printStackTrace();
- }
-
- //TODO
-
- }
-
- void removeSender(Sender sender)
- {
- _senders.remove(sender);
- }
-
- void removeReceiver(Receiver receiver)
- {
- _receivers.remove(receiver);
- }
-
- public SectionEncoder getSectionEncoder()
- {
- return _sectionEncoder;
- }
-
- public SectionDecoder getSectionDecoder()
- {
- return _sectionDecoder;
- }
-
-
- public Transaction createSessionLocalTransaction() throws LinkDetachedException
- {
- TransactionController localController = getSessionLocalTransactionController();
- return localController.beginTransaction();
- }
-
- private TransactionController getSessionLocalTransactionController()
- {
- if(_sessionLocalTC == null)
- {
- _sessionLocalTC = createSessionLocalTransactionController();
- }
- return _sessionLocalTC;
- }
-
- private TransactionController createSessionLocalTransactionController()
- {
- String name = "txnControllerLink";
- SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
- TxnCapability.MULTI_TXNS_PER_SSN);
- tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
- tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
- tcLinkEndpoint.attach();
- return new TransactionController(this, tcLinkEndpoint);
- }
-
-
- public Message receive()
- {
- while(getEndpoint().getState() == SessionState.ACTIVE)
- {
- synchronized (getEndpoint().getLock())
- {
- try
- {
- for(Receiver r : _receivers)
- {
- Message m = r.receive(false);
- if(m != null)
- return m;
- }
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- return null;
- }
-
- public Connection getConnection()
- {
- return _connection;
- }
-
- public void awaitActive()
- {
- synchronized(getEndpoint().getLock())
- {
- while(!getEndpoint().isEnded() && !getEndpoint().isActive())
- {
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
-
- public static interface SourceConfigurator
- {
- public void configureSource(final Source source);
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java
deleted file mode 100644
index e3a982d8a9..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
-
-class TCPTransportProvider implements TransportProvider
-{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
-
- private Socket _socket;
- private final String _transport;
-
- // Defines read socket timeout in milliseconds. A value of 0 means that the socket
- // read will block forever. Default value is set to 10000, which is 10 seconds.
- private int _readTimeout = Integer.getInteger("qpid.connection_read_timeout", 10000);
-
- // Defines the max idle read timeout in milliseconds before the connection is closed down in
- // the event of a SocketTimeoutException. A value of -1L will disable idle read timeout checking.
- // Default value is set to -1L, which means disable idle read checks.
- private long _readIdleTimeout = Long.getLong("qpid.connection_read_idle_timeout", -1L);
- private final AtomicLong _threadNameIndex = new AtomicLong();
-
- public TCPTransportProvider(final String transport)
- {
- _transport = transport;
- }
-
- @Override
- public void connect(final ConnectionEndpoint conn,
- final String address,
- final int port,
- final SSLContext sslContext,
- final SSLOptions sslOptions, final ExceptionHandler exceptionHandler) throws ConnectionException
- {
- try
- {
- if(sslContext != null)
- {
- final SSLSocketFactory socketFactory = sslContext.getSocketFactory();
- SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(address, port);
- if(sslOptions == null)
- {
- SSLUtil.removeSSLv3Support(sslSocket);
- }
- else
- {
- final List<String> enabledProtocols = sslOptions.getEnabledProtocols();
- final List<String> disabledProtocols = sslOptions.getDisabledProtocols();
-
- if(enabledProtocols != null && !enabledProtocols.isEmpty())
- {
- final Set<String> supportedSuites =
- new HashSet<>(Arrays.asList(sslSocket.getSupportedProtocols()));
- supportedSuites.retainAll(enabledProtocols);
- sslSocket.setEnabledProtocols(supportedSuites.toArray(new String[supportedSuites.size()]));
- }
-
- if(disabledProtocols != null && !disabledProtocols.isEmpty())
- {
- final Set<String> enabledSuites = new HashSet<>(Arrays.asList(sslSocket.getEnabledProtocols()));
- enabledSuites.removeAll(disabledProtocols);
- sslSocket.setEnabledProtocols(enabledSuites.toArray(new String[enabledSuites.size()]));
- }
- }
- sslSocket.startHandshake();
- conn.setExternalPrincipal(sslSocket.getSession().getLocalPrincipal());
- _socket=sslSocket;
- }
- else
- {
- _socket = new Socket(address, port);
- }
- // set socket read timeout
- _socket.setSoTimeout(_readTimeout);
-
- conn.setRemoteAddress(_socket.getRemoteSocketAddress());
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn);
-
- ConnectionHandler.BytesSource src;
-
- if(conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn);
-
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
- );
-
- conn.setSaslFrameOutput(saslOut);
- }
- else
- {
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry())
- );
- }
-
-
- final OutputStream outputStream = _socket.getOutputStream();
- ConnectionHandler.BytesOutputHandler outputHandler =
- new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler);
- long threadIndex = _threadNameIndex.getAndIncrement();
- Thread outputThread = new Thread(outputHandler, "QpidConnectionOutputThread-"+threadIndex);
-
- outputThread.setDaemon(true);
- outputThread.start();
- conn.setFrameOutputHandler(out);
-
-
- final ConnectionHandler handler = new ConnectionHandler(conn);
- final InputStream inputStream = _socket.getInputStream();
-
- Thread inputThread = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- doRead(conn, handler, inputStream);
- }
- finally
- {
- if(conn.closedForInput() && conn.closedForOutput())
- {
- close();
- }
- }
- }
- },"QpidConnectionInputThread-"+threadIndex);
-
- inputThread.setDaemon(true);
- inputThread.start();
-
- }
- catch (IOException e)
- {
- throw new ConnectionException(e);
- }
- }
-
- @Override
- public void close()
- {
- try
- {
- _socket.close();
- }
- catch (IOException e)
- {
- RAW_LOGGER.log(Level.WARNING, "Unexpected Error during TCPTransportProvider socket close", e);
- }
- }
-
- private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
-
-
- try
- {
- int read;
- boolean done = false;
- long lastReadTime = System.currentTimeMillis();
- while(!handler.isDone())
- {
- try
- {
- read = inputStream.read(buf);
- if(read == -1)
- {
- break;
- }
- lastReadTime = System.currentTimeMillis();
-
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- while(bbuf.hasRemaining() && !handler.isDone())
- {
- handler.parse(bbuf);
- }
-
- }
- catch(SocketTimeoutException e)
- {
- // Note that a SocketTimeoutException could only occur if _readTimeout > 0.
- // Only perform idle read timeout checking if _readIdleTimeout is greater than -1
- if(_readIdleTimeout > -1 && (System.currentTimeMillis() - lastReadTime >= _readIdleTimeout)){
- // break out of while loop and close down connection
- break;
- }
- }
- }
- if(!handler.isDone())
- {
- conn.inputClosed();
- if(conn.getConnectionEventListener() != null)
- {
- conn.getConnectionEventListener().closeReceived();
- }
- }
- }
- catch (IOException e)
- {
- conn.inputClosed();
- RAW_LOGGER.log(Level.INFO, "IO Error reading from connection", e);
- }
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
deleted file mode 100644
index a29bbcd232..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-public class TCPTransportProviderFactory implements TransportProviderFactory
-{
- @Override
- public Collection<String> getSupportedTransports()
- {
- return Arrays.asList("amqp","amqps");
- }
-
- @Override
- public TransportProvider getProvider(final String transport)
- {
- return new TCPTransportProvider(transport);
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
deleted file mode 100644
index e67f9e2fce..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.type.Binary;
-
-public class Transaction
-{
- private TransactionController _transactionController;
- private Binary _txnId;
-
- Transaction(final TransactionController transactionController, Binary txnId)
- {
- _transactionController = transactionController;
- _txnId = txnId;
- }
-
- public void commit() throws LinkDetachedException
- {
- _transactionController.commit(this);
- }
-
- public void rollback() throws LinkDetachedException
- {
- _transactionController.rollback(this);
- }
-
- public Binary getTxnId()
- {
- return _txnId;
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
deleted file mode 100644
index 4a4cce1146..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.client;
-
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
-import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.transaction.Declare;
-import org.apache.qpid.amqp_1_0.type.transaction.Declared;
-import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
-import org.apache.qpid.amqp_1_0.type.transport.*;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-
-public class TransactionController implements DeliveryStateHandler
-{
- private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 0});
- private SendingLinkEndpoint _endpoint;
- private Session _session;
- private volatile DeliveryState _state;
- private boolean _received;
- private Error _error;
-
- public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint)
- {
- _session = session;
- _endpoint = tcLinkEndpoint;
- _endpoint.setDeliveryStateHandler(this);
- _endpoint.setLinkEventListener(new SendingLinkListener()
- {
- @Override
- public void flowStateChanged()
- {
- // ignore
- }
-
- @Override
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- TransactionController.this.remoteDetached(detach);
- }
- });
- }
-
- public Transaction beginTransaction() throws LinkDetachedException
- {
-
-
- Binary txnId = declare();
- return new Transaction(this, txnId);
- }
-
- private Binary declare() throws LinkDetachedException
- {
- SectionEncoder encoder = _session.getSectionEncoder();
-
-
- AmqpValue section = new AmqpValue(new Declare());
-
-
- Transfer transfer = new Transfer();
- transfer.setPayload(section.encode(encoder).asByteBuffer());
- transfer.setDeliveryTag(DELIVERY_TAG);
- transfer.setSettled(Boolean.FALSE);
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- _state = null;
- _received = false;
- _endpoint.transfer(transfer);
-
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
- waitForResponse();
-
-
- return ((Declared) _state).getTxnId();
- }
-
- private void waitForResponse() throws LinkDetachedException
- {
- final Object lock = _endpoint.getLock();
- synchronized (lock)
- {
- while(!_received && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- }
- if(!_received && _endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- }
-
- private void remoteDetached(Detach detach)
- {
- final Object lock = _endpoint.getLock();
- synchronized (lock)
- {
- if (detach != null && detach.getError() != null)
- {
- _error = detach.getError();
- lock.notifyAll();
- }
- }
- }
-
-
- public void commit(final Transaction transaction) throws LinkDetachedException
- {
- discharge(transaction.getTxnId(), false);
- }
-
- public void rollback(final Transaction transaction) throws LinkDetachedException
- {
- discharge(transaction.getTxnId(), true);
- }
-
- private void discharge(final Binary txnId, final boolean fail) throws LinkDetachedException
- {
- Discharge discharge = new Discharge();
- discharge.setTxnId(txnId);
- discharge.setFail(fail);
- SectionEncoder encoder = _session.getSectionEncoder();
-
-
- AmqpValue section = new AmqpValue(discharge);
-
- Transfer transfer = new Transfer();
- transfer.setPayload(section.encode(encoder).asByteBuffer());
- transfer.setDeliveryTag(DELIVERY_TAG);
- transfer.setSettled(Boolean.FALSE);
-
- final Object lock = _endpoint.getLock();
- synchronized(lock)
- {
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
- if(_endpoint.isDetached())
- {
- throw new LinkDetachedException(_error);
- }
- _state = null;
- _received = false;
- _endpoint.transfer(transfer);
-
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
- }
- waitForResponse();
-
-
- }
-
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- synchronized(this)
- {
- _state = state;
- _received = true;
-
- if(!Boolean.TRUE.equals(settled))
- {
- _endpoint.updateDisposition(deliveryTag, state, true);
- }
-
- notifyAll();
- }
- }
-
- public void close()
- {
- _endpoint.close();
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
deleted file mode 100644
index e8ea53b451..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-
-public interface TransportProvider
-{
- void connect(ConnectionEndpoint conn,
- String address,
- int port,
- SSLContext sslContext,
- final SSLOptions sslOptions,
- ExceptionHandler exceptionHandler) throws ConnectionException;
-
- void close();
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
deleted file mode 100644
index 82999c5ccc..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-import java.util.Collection;
-
-public interface TransportProviderFactory
-{
- Collection<String> getSupportedTransports();
- TransportProvider getProvider(String transport);
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
deleted file mode 100644
index ffde030b30..0000000000
--- a/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.qpid.amqp_1_0.client.TCPTransportProviderFactory \ No newline at end of file