diff options
| author | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
| commit | 0a0baee45ebcff44635907d457c4ff6810b09c87 (patch) | |
| tree | 8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/amqp-1-0-client | |
| parent | 54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff) | |
| download | qpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz | |
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-client')
34 files changed, 0 insertions, 6768 deletions
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java deleted file mode 100644 index 3bb26744c4..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.lang.reflect.InvocationTargetException; - -public class Command -{ - public static void main(String[] args) throws - ClassNotFoundException, - NoSuchMethodException, - InvocationTargetException, - IllegalAccessException, - InstantiationException - { - String name = args[0]; - String[] cmdArgs = new String[args.length-1]; - System.arraycopy(args,1,cmdArgs,0,args.length-1); - name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); - Class<Util> clazz = (Class<Util>) Class.forName(name); - Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); - util.run(); - - } -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java deleted file mode 100644 index 09d19f4394..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ /dev/null @@ -1,430 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -public class Demo extends Util -{ - private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:"; - private static final String OPCODE = "opcode"; - private static final String ACTION = "action"; - private static final String MESSAGE_ID = "message-id"; - private static final String VENDOR = "vendor"; - private static final String LOG = "log"; - private static final String RECEIVED = "received"; - private static final String TEST = "test"; - private static final String APACHE = "apache"; - private static final String SENT = "sent"; - private static final String LINK_REF = "link-ref"; - private static final String HOST = "host"; - private static final String PORT = "port"; - private static final String SASL_USER = "sasl-user"; - private static final String SASL_PASSWORD = "sasl-password"; - private static final String ROLE = "role"; - private static final String ADDRESS = "address"; - private static final String SENDER = "sender"; - private static final String SEND_MESSAGE = "send-message"; - private static final String ANNOUNCE = "announce"; - private static final String MESSAGE_VENDOR = "message-vendor"; - private static final String CREATE_LINK = "create-link"; - - public static void main(String[] args) - { - new Demo(args).run(); - } - - public Demo(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected boolean hasWindowSizeOption() - { - return false; - } - - public void run() - { - - try - { - - final String vendor = getArgs()[0]; - final String queue = "control"; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - - Receiver responseReceiver; - - responseReceiver = session.createTemporaryQueueReceiver(); - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), null); - - - Properties properties = new Properties(); - properties.setMessageId(java.util.UUID.randomUUID()); - properties.setReplyTo(responseReceiver.getAddress()); - - HashMap appPropMap = new HashMap(); - ApplicationProperties appProperties = new ApplicationProperties(appPropMap); - - appPropMap.put(OPCODE, ANNOUNCE); - appPropMap.put(VENDOR, vendor); - appPropMap.put(ADDRESS,responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { properties, appProperties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - try - { - s.send(message1); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>(); - Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>(); - - - boolean done = false; - - while(!done) - { - boolean wait = true; - Message m = responseReceiver.receive(false); - if(m != null) - { - List<Section> payload = m.getPayload(); - wait = false; - ApplicationProperties props = m.getApplicationProperties(); - Map map = props.getValue(); - String op = (String) map.get(OPCODE); - if("reset".equals(op)) - { - for(Sender sender : sendingLinks.values()) - { - try - { - sender.close(); - Session session1 = sender.getSession(); - session1.close(); - session1.getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - for(Receiver receiver : receivingLinks.values()) - { - try - { - receiver.close(); - receiver.getSession().close(); - receiver.getSession().getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - sendingLinks.clear(); - receivingLinks.clear(); - } - else if(CREATE_LINK.equals(op)) - { - Object linkRef = map.get(LINK_REF); - String host = (String) map.get(HOST); - Object o = map.get(PORT); - int port = Integer.parseInt(String.valueOf(o)); - String user = (String) map.get(SASL_USER); - String password = (String) map.get(SASL_PASSWORD); - String role = (String) map.get(ROLE); - String address = (String) map.get(ADDRESS); - System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); - try{ - - - Connection conn2 = new Connection(host, port, user, password, host); - Session session2 = conn2.createSession(); - if(sendingLinks.containsKey(linkRef)) - { - try - { - sendingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(receivingLinks.containsKey(linkRef)) - { - try - { - receivingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(SENDER.equals(role)) - { - - System.err.println("%%% Creating sender (" + linkRef + ")"); - Sender sender = session2.createSender(address); - sendingLinks.put(linkRef, sender); - } - else - { - - System.err.println("%%% Creating receiver (" + linkRef + ")"); - Receiver receiver2 = session2.createReceiver(address); - receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - receivingLinks.put(linkRef, receiver2); - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - else if(SEND_MESSAGE.equals(op)) - { - Sender sender = sendingLinks.get(map.get(LINK_REF)); - Properties m2props = new Properties(); - Object messageId = map.get(MESSAGE_ID); - m2props.setMessageId(messageId); - Map m2propmap = new HashMap(); - m2propmap.put(OPCODE, TEST); - m2propmap.put(VENDOR, vendor); - ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); - Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - try - { - sender.send(m2); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, SENT); - m3propmap.put(MESSAGE_ID, messageId); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, vendor); - - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+messageId))); - try - { - s.send(m3); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - responseReceiver.acknowledge(m); - } - else - { - for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet()) - { - m = entry.getValue().receive(false); - if(m != null) - { - wait = false; - - System.err.println("%%% Received message from " + entry.getKey()); - - Properties mp = m.getProperties(); - ApplicationProperties ap = m.getApplicationProperties(); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, RECEIVED); - m3propmap.put(MESSAGE_ID, mp.getMessageId()); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+mp.getMessageId()))); - try - { - s.send(m3); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - entry.getValue().acknowledge(m); - } - - } - } - - if(wait) - { - try - { - Thread.sleep(500l); - } - catch (InterruptedException e) - { - e.printStackTrace(); //TODO. - } - } - - } - - - - - - - - - - s.close(); - session.close(); - conn.close(); - - } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java deleted file mode 100644 index 06440b8f19..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.commons.cli.Options; - -public class Dump extends Util -{ - private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:"; - - - protected Dump(String[] args) - { - super(args); - } - - public static void main(String[] args) - { - new Dump(args).run(); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasLinkNameOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasSizeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasBlockOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasStdInOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasTxnOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasModeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasCountOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void printUsage(Options options) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, 10); - - Message message = new Message("dump me"); - message.setDeliveryTag(new Binary("dump".getBytes())); - - s.send(message); - - s.close(); - session.close(); - conn.close(); - - } catch (Exception e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java deleted file mode 100644 index e65d1324ef..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -public class Filereceiver extends Util -{ - private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:"; - - protected Filereceiver(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - final File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); - final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); - - Accepted accepted = new Accepted(); - - for(String fileName : unsettledFiles) - { - File theFile = new File(tmpDirectory, fileName); - if(theFile.isFile()) - { - if(fileName.startsWith("~") && fileName.endsWith("~")) - { - theFile.delete(); - } - else - { - int splitPoint = fileName.indexOf("."); - String deliveryTagStr = fileName.substring(0,splitPoint); - String actualFileName = fileName.substring(splitPoint+1); - - byte[] bytes = new byte[deliveryTagStr.length()/2]; - - - for(int i = 0; i < bytes.length; i++) - { - char c = deliveryTagStr.charAt(2*i); - char d = deliveryTagStr.charAt(1+(2*i)); - - bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) - | (d <= '9' ? d - '0' : d - 'W')); - - } - Binary deliveryTag = new Binary(bytes); - unsettled.put(deliveryTag, accepted); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - } - - Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled(); - - for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - - File tmpFile = new File(tmpDirectory, entry.getValue()); - final File dest = new File(directory, - entry.getValue().substring(entry.getValue().indexOf(".") + 1)); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - - tmpFile.renameTo(dest); - } - } - - - int credit = 10; - - r.setCredit(UnsignedInteger.valueOf(credit), true); - - - int received = 0; - Message m = null; - do - { - m = isBlock() && received == 0 ? r.receive() : r.receive(10000); - if(m != null) - { - if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) - { - final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); - final File unsettledFile = new File(tmpDirectory, - tmpFileName); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - int splitPoint = tmpFileName.indexOf("."); - - String fileName = tmpFileName.substring(splitPoint+1); - - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - unsettledFileNames.remove(deliveryTag); - } - }); - } - else - { - received++; - List<Section> sections = m.getPayload(); - Binary deliveryTag = m.getDeliveryTag(); - StringBuilder tagNameBuilder = new StringBuilder(); - - ByteBuffer dtbuf = deliveryTag.asByteBuffer(); - while(dtbuf.hasRemaining()) - { - tagNameBuilder.append(String.format("%02x", dtbuf.get())); - } - - - ApplicationProperties properties = null; - List<Binary> data = new ArrayList<Binary>(); - int totalSize = 0; - for(Section section : sections) - { - if(section instanceof ApplicationProperties) - { - properties = (ApplicationProperties) section; - } - else if(section instanceof AmqpValue) - { - AmqpValue value = (AmqpValue) section; - if(value.getValue() instanceof Binary) - { - Binary binary = (Binary) value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - else - { - // TODO exception - } - } - else if(section instanceof Data) - { - Data value = (Data) section; - Binary binary = value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - } - if(properties != null) - { - final String fileName = (String) properties.getValue().get("filename"); - byte[] fileData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(fileData); - int offset = 0; - for(Binary bin : data) - { - buf.put(bin.asByteBuffer()); - } - File outputFile = new File(tmpDirectory, "~"+fileName+"~"); - if(outputFile.exists()) - { - outputFile.delete(); - } - FileOutputStream fos = new FileOutputStream(outputFile); - fos.write(fileData); - fos.flush(); - fos.close(); - - final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + - fileName); - outputFile.renameTo(unsettledFile); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - - } - }); - - } - } - } - } - while(m != null); - - - r.close(); - } - else - { - System.err.println("No such directory: " + directoryName); - } - session.close(); - conn.close(); - } - catch (ConnectionException e) - { - e.printStackTrace(); - } - catch (FileNotFoundException e) - { - e.printStackTrace(); //TODO. - } - catch (IOException e) - { - e.printStackTrace(); //TODO. - } - - } - - public static void main(String[] args) - { - new Filereceiver(args).run(); - } -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java deleted file mode 100644 index c7bcd99312..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -public class Filesender extends Util -{ - private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:"; - - protected Filesender(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - MessageDigest md5 = MessageDigest.getInstance("MD5"); - Connection conn = newConnection(); - - Session session = conn.createSession(); - - File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - - - Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); - Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); - for(String fileName : unsettledFiles) - { - File aFile = new File(tmpDirectory, fileName); - if(aFile.canRead() && aFile.canWrite()) - { - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - unsettled.put(deliveryTag, null); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - - Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled(); - - for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); - } - } - - if(remoteUnsettled != null) - { - for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet()) - { - if(entry.getValue() instanceof Accepted) - { - final String fileName = unsettledFileNames.get(entry.getKey()); - if(fileName != null) - { - - Message resumed = new Message(); - resumed.setDeliveryTag(entry.getKey()); - resumed.setDeliveryState(entry.getValue()); - resumed.setResume(Boolean.TRUE); - resumed.setSettled(Boolean.TRUE); - - - - final File unsettledFile = new File(tmpDirectory, fileName); - unsettledFile.delete(); - - s.send(resumed); - - } - - } - else if(entry.getValue() instanceof Received || entry.getValue() == null) - { - final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); - Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); - resumed.setResume(Boolean.TRUE); - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - s.send(resumed, action); - - } - } - } - - - - String[] files = directory.list(); - - for(String fileName : files) - { - final File file = new File(directory, fileName); - - if(file.canRead() && file.canWrite() && !file.isDirectory()) - { - Message message = createMessageFromFile(md5, fileName, file); - - final File unsettledFile = new File(tmpDirectory, fileName); - - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - - file.renameTo(unsettledFile); - - s.send(message, action); - } - } - - s.close(); - } - else - { - System.err.println("No such directory: " + directory); - } - session.close(); - conn.close(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException - { - FileInputStream fis = new FileInputStream(file); - byte[] data = new byte[(int) file.length()]; - - int read = fis.read(data); - - fis.close(); - - Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); - Section amqpValue = new Data(new Binary(data)); - Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - message.setDeliveryTag(deliveryTag); - md5.reset(); - return message; - } - - public static void main(String[] args) - { - new Filesender(args).run(); - } -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java deleted file mode 100644 index a084c0bacc..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.commons.cli.*; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; - -import java.util.Collections; - -public class Receive extends Util -{ - private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:"; - private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); - private UnsignedLong _lastCorrelationId; - - public static void main(String[] args) - { - new Receive(args).run(); - } - - - public Receive(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasFilterOption() - { - return true; - } - - protected void run() - { - - try - { - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - - - Session session = conn.createSession(); - - Filter filter = null; - if(getFilter() != null) - { - String[] filterParts = getFilter().split("=",2); - if("exact-subject".equals(filterParts[0])) - { - filter = new ExactSubjectFilter(filterParts[1]); - } - else if("matching-subject".equals(filterParts[0])) - { - filter = new MatchingSubjectFilter(filterParts[1]); - } - else - { - System.err.println("Unknown filter type: " + filterParts[0]); - } - } - - Receiver r = - filter == null - ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) - : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); - Transaction txn = null; - - int credit = 0; - int receivedCount = 0; - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - - Transaction txn2 = null; - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - txn2 = session.createSessionLocalTransaction(); - } - - for(int i = 0; i < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - { - r.setCredit(UnsignedInteger.valueOf(credit), false); - } - if(!isBlock()) - r.drain(); - } - - Message m = isBlock() ? r.receive() : r.receive(1000L); - credit--; - if(m==null) - { - break; - } - - - - r.acknowledge(m.getDeliveryTag(),txn); - - receivedCount++; - - System.out.println("Received Message : " + m.getPayload()); - } - - if(useTran()) - { - txn.commit(); - } - } - else - { - // TODO - } - } - else - { - // TODO - } - r.close(); - session.close(); - conn.close(); - System.out.println("Total Messages Received: " + receivedCount); - } - catch (ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (LinkDetachedException e) - { - e.printStackTrace(); - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java deleted file mode 100644 index bce7bfcd9a..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.Arrays; - -public class Request extends Util -{ - private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:"; - - public static void main(String[] args) - { - new Request(args).run(); - } - - public Request(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public void run() - { - - try - { - - - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - Connection conn2; - Session session2; - Receiver responseReceiver; - - if(isUseMultipleConnections()) - { - conn2 = newConnection(); - session2 = conn2.createSession(); - responseReceiver = session2.createTemporaryQueueReceiver(); - } - else - { - conn2 = null; - session2 = null; - responseReceiver = session.createTemporaryQueueReceiver(); - } - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), null); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - int received = 0; - - if(getArgs().length >= 2) - { - message = getArgs()[1]; - if(message.length() < getMessageSize()) - { - StringBuilder builder = new StringBuilder(getMessageSize()); - builder.append(message); - for(int x = message.length(); x < getMessageSize(); x++) - { - builder.append('.'); - } - message = builder.toString(); - } - - for(int i = 0; i < getCount(); i++) - { - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - properties.setReplyTo(responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { new Header() , properties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - - Message responseMessage = responseReceiver.receive(false); - if(responseMessage != null) - { - responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); - received++; - } - } - } - - if(txn != null) - { - txn.commit(); - } - - - while(received < getCount()) - { - Message responseMessage = responseReceiver.receive(); - responseReceiver.acknowledge(responseMessage.getDeliveryTag()); - received++; - } - - - - - s.close(); - session.close(); - conn.close(); - - if(session2 != null) - { - session2.close(); - conn2.close(); - } - } - catch (Exception e) - { - e.printStackTrace(); //TODO. - } - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java deleted file mode 100644 index e29323eb80..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.TimeoutException; - -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -public class Respond extends Util -{ - private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:"; - private Connection _conn; - private Session _session; - private Receiver _receiver; - private Transaction _txn; - private Map<String,Sender> _senders; - private UnsignedLong _responseMsgId = UnsignedLong.ZERO; - private Connection _conn2; - private Session _session2; - - public Respond(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return true; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public static void main(String[] args) - { - new Respond(args).run(); - } - - public void run() - { - try - { - - _senders = new HashMap<String, Sender>(); - - final String queue = getArgs()[0]; - - String message = ""; - - _conn = newConnection(); - - - - if(isUseMultipleConnections()) - { - _conn2 = newConnection(); - _session2 = _conn2.createSession(); - } - - - _session = _conn.createSession(); - - - _receiver = _session.createReceiver(queue, getMode()); - _txn = null; - - int credit = 0; - int receivedCount = 0; - _responseMsgId = UnsignedLong.ZERO; - - Random random = null; - int batch = 0; - List<Message> txnMessages = null; - if(useTran()) - { - if(getRollbackRatio() != 0) - { - random = new Random(); - } - batch = getBatchSize(); - _txn = _session.createSessionLocalTransaction(); - txnMessages = new ArrayList<Message>(batch); - } - - - for(int i = 0; receivedCount < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - _receiver.setCredit(UnsignedInteger.valueOf(credit), false); - - if(!isBlock()) - _receiver.drain(); - } - - Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); - credit--; - if(m==null) - { - if(useTran() && batch != getBatchSize()) - { - _txn.commit(); - } - break; - } - - System.out.println("Received Message: " + m.getPayload()); - - respond(m); - - - - if(useTran()) - { - - txnMessages.add(m); - - if(--batch == 0) - { - - if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - else - { - System.out.println("Random Rollback"); - _txn.rollback(); - double result; - do - { - _txn = _session.createSessionLocalTransaction(); - - for(Message msg : txnMessages) - { - respond(msg); - } - - result = random.nextDouble(); - if(result<getRollbackRatio()) - { - _txn.rollback(); - } - else - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - } - while(result < getRollbackRatio()); - } - _txn = _session.createSessionLocalTransaction(); - - batch = getBatchSize(); - } - } - else - { - receivedCount++; - } - - } - - - for(Sender s : _senders.values()) - { - s.close(); - } - - _receiver.close(); - _session.close(); - _conn.close(); - System.out.println("Received: " + receivedCount); - } - catch (Exception e) - { - e.printStackTrace(); //TODO. - } - } - - private void respond(Message m) - throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException, TimeoutException - { - List<Section> sections = m.getPayload(); - String replyTo = null; - Object correlationId = null; - for(Section section : sections) - { - if(section instanceof Properties) - { - replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); - correlationId = ((Properties) section).getMessageId(); - break; - } - } - - if(replyTo != null) - { - Sender s = _senders.get(replyTo); - if(s == null) - { - s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); - _senders.put(replyTo, s); - } - - List<Section> replySections = new ArrayList<Section>(sections); - - ListIterator<Section> sectionIterator = replySections.listIterator(); - - while(sectionIterator.hasNext()) - { - Section section = sectionIterator.next(); - if(section instanceof Properties) - { - Properties newProps = new Properties(); - newProps.setTo(replyTo); - newProps.setCorrelationId(correlationId); - newProps.setMessageId(_responseMsgId); - _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); - sectionIterator.set(newProps); - } - } - - Message replyMessage = new Message(replySections); - System.out.println("Sent Message: " + replySections); - s.send(replyMessage, _txn); - - } - _receiver.acknowledge(m.getDeliveryTag(), _txn); - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java deleted file mode 100644 index 36aadc7851..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.util.Arrays; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -public class Send extends Util -{ - private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:"; - private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; - - - public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, ConnectionException - { - new Send(args).run(); - } - - - public Send(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasSubjectOption() - { - return true; - } - - public void run() - { - - final String queue = getArgs()[0]; - - String message = ""; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - if(getArgs().length == 2) - { - message = getArgs()[1]; - } - for(int i = 0; i < getCount(); i++) - { - - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - if(getSubject() != null) - { - properties.setSubject(getSubject()); - } - Section bodySection; - byte[] bytes = (message + " " + i).getBytes(); - if(bytes.length < getMessageSize()) - { - byte[] origBytes = bytes; - bytes = new byte[getMessageSize()]; - System.arraycopy(origBytes,0,bytes,0,origBytes.length); - for(int x = origBytes.length; x < bytes.length; x++) - { - bytes[x] = (byte) '.'; - } - bodySection = new Data(new Binary(bytes)); - } - else - { - bodySection = new AmqpValue(message + " " + i); - } - - Section[] sections = {properties, bodySection}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - } - } - else - { - for(int i = 1; i < getArgs().length; i++) - { - s.send(new Message(getArgs()[i]), txn); - } - - } - } - else - { - LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); - - - try - { - while((message = buf.readLine()) != null) - { - s.send(new Message(message), txn); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - } - - if(txn != null) - { - txn.commit(); - } - - s.close(); - - session.close(); - conn.close(); - } - catch (Exception e) - { - e.printStackTrace(); //TODO. - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java deleted file mode 100644 index 4421c44a61..0000000000 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java +++ /dev/null @@ -1,531 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.commons.cli.*; - -import java.util.logging.*; - -public abstract class Util -{ - - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - private String _host; - private String _username; - private String _password; - private int _port; - private int _count; - private boolean _useStdIn; - private boolean _useTran; - private String[] _args; - private AcknowledgeMode _mode; - private boolean _block; - private int _frameSize; - private int _messageSize; - private String _responseQueue; - private int _batchSize; - private double _rollbackRatio; - private String _linkName; - private String _containerName; - private boolean _durableLink; - private boolean _useMultipleConnections; - private int _windowSize = 100; - private String _subject; - private String _filter; - private String _remoteHost; - private boolean _useSSL; - - protected Util(String[] args) - { - CommandLineParser cmdLineParse = new PosixParser(); - - Options options = new Options(); - options.addOption("h","help",false,"show this help message and exit"); - options.addOption(OptionBuilder.withLongOpt("host") - .withDescription( "host to connect to (default 0.0.0.0)" ) - .hasArg(true) - .withArgName("HOST") - .create('H')); - options.addOption(OptionBuilder.withLongOpt("username") - .withDescription( "username to use for authentication" ) - .hasArg(true) - .withArgName("USERNAME") - .create('u')); - options.addOption(OptionBuilder.withLongOpt("password") - .withDescription( "password to use for authentication" ) - .hasArg(true) - .withArgName("PASSWORD") - .create('w')); - options.addOption(OptionBuilder.withLongOpt("port") - .withDescription( "port to connect to (default 5672)" ) - .hasArg(true) - .withArgName("PORT") - .create('p')); - options.addOption(OptionBuilder.withLongOpt("frame-size") - .withDescription( "specify the maximum frame size" ) - .hasArg(true) - .withArgName("FRAME_SIZE") - .create('f')); - options.addOption(OptionBuilder.withLongOpt("container-name") - .withDescription( "Container name" ) - .hasArg(true) - .withArgName("CONTAINER_NAME") - .create('C')); - - options.addOption(OptionBuilder.withLongOpt("ssl") - .withDescription("Use SSL") - .create('S')); - - options.addOption(OptionBuilder.withLongOpt("remote-hostname") - .withDescription( "hostname to supply in the open frame" ) - .hasArg(true) - .withArgName("HOST") - .create('O')); - - if(hasBlockOption()) - options.addOption(OptionBuilder.withLongOpt("block") - .withDescription("block until messages arrive") - .create('b')); - - if(hasCountOption()) - options.addOption(OptionBuilder.withLongOpt("count") - .withDescription( "number of messages to send (default 1)" ) - .hasArg(true) - .withArgName("COUNT") - .create('c')); - if(hasModeOption()) - options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") - .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) - .hasArg(true) - .withArgName("MODE") - .create('k')); - - if(hasSubjectOption()) - options.addOption(OptionBuilder.withLongOpt("subject") - .withDescription( "subject message property" ) - .hasArg(true) - .withArgName("SUBJECT") - .create('s')); - - - if(hasSingleLinkPerConnectionMode()) - options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") - .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") - .hasArg(false) - .create('Z')); - - if(hasFilterOption()) - options.addOption(OptionBuilder.withLongOpt("filter") - .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") - .hasArg(true) - .withArgName("<TYPE>=<VALUE>") - .create('F')); - - - if(hasTxnOption()) - { - options.addOption("x","txn",false,"use transactions"); - options.addOption(OptionBuilder.withLongOpt("batch-size") - .withDescription( "transaction batch size (default: 1)" ) - .hasArg(true) - .withArgName("BATCH-SIZE") - .create('B')); - options.addOption(OptionBuilder.withLongOpt("rollback-ratio") - .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) - .hasArg(true) - .withArgName("RATIO") - .create('R')); - } - - if(hasLinkDurableOption()) - { - options.addOption("d","durable-link",false,"use a durable link"); - } - - if(hasStdInOption()) - options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); - - options.addOption(OptionBuilder.withLongOpt("trace") - .withDescription("trace logging specified categories: RAW, FRM") - .hasArg(true) - .withArgName("TRACE") - .create('t')); - if(hasSizeOption()) - options.addOption(OptionBuilder.withLongOpt("message-size") - .withDescription( "size to pad outgoing messages to" ) - .hasArg(true) - .withArgName("SIZE") - .create('z')); - - if(hasResponseQueueOption()) - options.addOption(OptionBuilder.withLongOpt("response-queue") - .withDescription( "response queue to reply to" ) - .hasArg(true) - .withArgName("RESPONSE_QUEUE") - .create('r')); - - if(hasLinkNameOption()) - { - options.addOption(OptionBuilder.withLongOpt("link") - .withDescription( "link name" ) - .hasArg(true) - .withArgName("LINK") - .create('l')); - } - - if(hasWindowSizeOption()) - { - options.addOption(OptionBuilder.withLongOpt("window-size") - .withDescription("credit window size") - .hasArg(true) - .withArgName("WINDOW-SIZE") - .create('W')); - } - - CommandLine cmdLine = null; - try - { - cmdLine = cmdLineParse.parse(options, args); - - } - catch (ParseException e) - { - printUsage(options); - System.exit(-1); - } - - if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) - { - printUsage(options); - System.exit(0); - } - _host = cmdLine.getOptionValue('H',"0.0.0.0"); - _remoteHost = cmdLine.getOptionValue('O',null); - String portStr = cmdLine.getOptionValue('p',"5672"); - String countStr = cmdLine.getOptionValue('c',"1"); - - _useSSL = cmdLine.hasOption('S'); - - if(hasWindowSizeOption()) - { - String windowSizeStr = cmdLine.getOptionValue('W',"100"); - _windowSize = Integer.parseInt(windowSizeStr); - } - - if(hasSubjectOption()) - { - _subject = cmdLine.getOptionValue('s'); - } - - if(cmdLine.hasOption('u')) - { - _username = cmdLine.getOptionValue('u'); - } - - if(cmdLine.hasOption('w')) - { - _password = cmdLine.getOptionValue('w'); - } - - if(cmdLine.hasOption('F')) - { - _filter = cmdLine.getOptionValue('F'); - } - - _port = Integer.parseInt(portStr); - - _containerName = cmdLine.getOptionValue('C'); - - if(hasBlockOption()) - _block = cmdLine.hasOption('b'); - - if(hasLinkNameOption()) - _linkName = cmdLine.getOptionValue('l'); - - - if(hasLinkDurableOption()) - _durableLink = cmdLine.hasOption('d'); - - if(hasCountOption()) - _count = Integer.parseInt(countStr); - - if(hasStdInOption()) - _useStdIn = cmdLine.hasOption('i'); - - if(hasSingleLinkPerConnectionMode()) - _useMultipleConnections = cmdLine.hasOption('Z'); - - if(hasTxnOption()) - { - _useTran = cmdLine.hasOption('x'); - _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); - _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); - } - - if(hasModeOption()) - { - _mode = AcknowledgeMode.ALO; - - if(cmdLine.hasOption('k')) - { - _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); - } - } - - if(hasResponseQueueOption()) - { - _responseQueue = cmdLine.getOptionValue('r'); - } - - _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); - - if(hasSizeOption()) - { - _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); - } - - String categoriesList = cmdLine.getOptionValue('t'); - String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); - for(String cat : categories) - { - if(cat.equalsIgnoreCase("FRM")) - { - FRAME_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : FRAME_LOGGER.getHandlers()) - { - FRAME_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - FRAME_LOGGER.addHandler(handler); - } - else if (cat.equalsIgnoreCase("RAW")) - { - RAW_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : RAW_LOGGER.getHandlers()) - { - RAW_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - RAW_LOGGER.addHandler(handler); - } - } - - - _args = cmdLine.getArgs(); - - } - - protected boolean hasFilterOption() - { - return false; - } - - protected boolean hasSubjectOption() - { - return false; - } - - protected boolean hasWindowSizeOption() - { - return false; - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected abstract boolean hasLinkDurableOption(); - - protected abstract boolean hasLinkNameOption(); - - protected abstract boolean hasResponseQueueOption(); - - protected abstract boolean hasSizeOption(); - - protected abstract boolean hasBlockOption(); - - protected abstract boolean hasStdInOption(); - - protected abstract boolean hasTxnOption(); - - protected abstract boolean hasModeOption(); - - protected abstract boolean hasCountOption(); - - public String getHost() - { - return _host; - } - - public String getUsername() - { - return _username; - } - - public String getPassword() - { - return _password; - } - - public int getPort() - { - return _port; - } - - public int getCount() - { - return _count; - } - - public boolean useStdIn() - { - return _useStdIn; - } - - public boolean useTran() - { - return _useTran; - } - - public AcknowledgeMode getMode() - { - return _mode; - } - - public boolean isBlock() - { - return _block; - } - - public String[] getArgs() - { - return _args; - } - - public int getMessageSize() - { - return _messageSize; - } - - public String getResponseQueue() - { - return _responseQueue; - } - - public int getBatchSize() - { - return _batchSize; - } - - public double getRollbackRatio() - { - return _rollbackRatio; - } - - public String getLinkName() - { - return _linkName; - } - - public boolean isDurableLink() - { - return _durableLink; - } - - public boolean isUseMultipleConnections() - { - return _useMultipleConnections; - } - - public void setUseMultipleConnections(boolean useMultipleConnections) - { - _useMultipleConnections = useMultipleConnections; - } - - public String getSubject() - { - return _subject; - } - - public void setSubject(String subject) - { - _subject = subject; - } - - protected abstract void printUsage(final Options options); - - protected abstract void run(); - - - public Connection newConnection() throws ConnectionException - { - Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); - return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, - _remoteHost == null ? getHost() : _remoteHost, _useSSL, - 0) - : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, - container, _remoteHost == null ? getHost() : _remoteHost, _useSSL, - 0); - } - - public String getContainerName() - { - return _containerName; - } - - public int getWindowSize() - { - return _windowSize; - } - - public void setWindowSize(int windowSize) - { - _windowSize = windowSize; - } - - public String getFilter() - { - return _filter; - } -} diff --git a/qpid/java/amqp-1-0-client/pom.xml b/qpid/java/amqp-1-0-client/pom.xml deleted file mode 100644 index 5e5e6a547a..0000000000 --- a/qpid/java/amqp-1-0-client/pom.xml +++ /dev/null @@ -1,53 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-java-build</artifactId> - <version>0.32-SNAPSHOT</version> - </parent> - - <artifactId>qpid-amqp-1-0-client</artifactId> - <name>Qpid AMQP 1.0 Client</name> - <description>AMQP 1.0 compliant client module</description> - - <dependencies> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-amqp-1-0-common</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - <resource> - <directory>src/main/java</directory> - <includes> - <include>resources/</include> - </includes> - </resource> - </resources> - </build> - -</project> diff --git a/qpid/java/amqp-1-0-client/resources/LICENSE b/qpid/java/amqp-1-0-client/resources/LICENSE deleted file mode 100644 index de4b130f35..0000000000 --- a/qpid/java/amqp-1-0-client/resources/LICENSE +++ /dev/null @@ -1,204 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - diff --git a/qpid/java/amqp-1-0-client/resources/NOTICE b/qpid/java/amqp-1-0-client/resources/NOTICE deleted file mode 100644 index 8d1c3f3122..0000000000 --- a/qpid/java/amqp-1-0-client/resources/NOTICE +++ /dev/null @@ -1,5 +0,0 @@ -Apache Qpid -Copyright 2006-2012 Apache Software Foundation -This product includes software developed at -Apache Software Foundation (http://www.apache.org/) - diff --git a/qpid/java/amqp-1-0-client/resources/README.txt b/qpid/java/amqp-1-0-client/resources/README.txt deleted file mode 100644 index 35d25050fe..0000000000 --- a/qpid/java/amqp-1-0-client/resources/README.txt +++ /dev/null @@ -1,7 +0,0 @@ - -Documentation --------------- -All of our user documentation can be accessed at: - -http://qpid.apache.org/documentation.html - diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java deleted file mode 100644 index 05d176bc35..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -public enum AcknowledgeMode -{ - AMO, - ALO, - EO -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java deleted file mode 100644 index 1f23d02e02..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ChannelsExhaustedException.java +++ /dev/null @@ -1,39 +0,0 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.amqp_1_0.client;
-
-public class ChannelsExhaustedException extends ConnectionException
-{
- protected ChannelsExhaustedException(final String message)
- {
- super(message);
- }
-
- public ChannelsExhaustedException(Throwable cause)
- {
- super(cause);
- }
-
- ChannelsExhaustedException()
- {
-
- }
-}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java deleted file mode 100644 index a4f9ac5a3a..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ /dev/null @@ -1,423 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.security.NoSuchAlgorithmException; -import java.security.Principal; -import java.util.ServiceLoader; -import java.util.concurrent.TimeoutException; - -import javax.net.ssl.SSLContext; - -import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; -import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.Predicate; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.SaslFrameBody; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; -import org.apache.qpid.amqp_1_0.type.transport.Error; - -public class Connection implements ExceptionHandler -{ - private static final int MAX_FRAME_SIZE = 65536; - - private String _address; - private ConnectionEndpoint _conn; - private int _sessionCount; - private Runnable _connectionErrorTask; - private Error _socketError; - - - public Connection(final String address, - final int port, - final String username, - final String password) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE); - } - - public Connection(final String address, - final int port, - final String username, - final String password, String remoteHostname) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,new Container()); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container) throws ConnectionException - { - this(address,port,username,password,MAX_FRAME_SIZE,container); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,container, null); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname) throws ConnectionException - { - this(address,port,username,password,maxFrameSize,container,remoteHostname,false,-1); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container, - final boolean ssl) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl,-1); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final String remoteHost, - final boolean ssl) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl,-1); - } - - public Connection(final String address, - final int port, - final String username, - final String password, - final Container container, - final String remoteHost, - final boolean ssl, - final int channelMax) throws ConnectionException - { - this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl, - channelMax); - } - - - public Connection(final String protocol, - final String address, - final int port, - final String username, - final String password, - final Container container, - final String remoteHost, - final SSLContext sslContext, - final int channelMax) throws ConnectionException - { - this(protocol, address, port, username, password,container,remoteHost,sslContext, - null, channelMax); - } - - public Connection(final String protocol, - final String address, - final int port, - final String username, - final String password, - final Container container, - final String remoteHost, - final SSLContext sslContext, - final SSLOptions sslOptions, - final int channelMax) throws ConnectionException - { - this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,sslContext, - sslOptions, channelMax); - } - - - public Connection(final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname, - boolean ssl, - int channelMax) throws ConnectionException - { - this(ssl?"amqps":"amqp",address,port,username,password,maxFrameSize,container, - remoteHostname, - getSslContext(ssl), - null, - channelMax); - } - - private static SSLContext getSslContext(final boolean ssl) throws ConnectionException - { - try - { - return ssl ? SSLContext.getDefault() : null; - } - catch (NoSuchAlgorithmException e) - { - throw new ConnectionException(e); - } - } - - public Connection(final String protocol, - final String address, - final int port, - final String username, - final String password, - final int maxFrameSize, - final Container container, - final String remoteHostname, - SSLContext sslContext, - final SSLOptions sslOptions, int channelMax) throws ConnectionException - { - - _address = address; - - - Principal principal = username == null ? null : new Principal() - { - - public String getName() - { - return username; - } - }; - _conn = new ConnectionEndpoint(container, principal, password); - if(channelMax >= 0) - { - _conn.setChannelMax((short)channelMax); - } - _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); - _conn.setRemoteHostname(remoteHostname); - - ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn); - - ConnectionHandler.BytesSource src; - - if(_conn.requiresSASL()) - { - ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn); - - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)3, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),_conn.getDescribedTypeRegistry()), - new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry()) - ); - - _conn.setSaslFrameOutput(saslOut); - } - else - { - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),_conn.getDescribedTypeRegistry()) - ); - } - - TransportProvider transportProvider = getTransportProvider(protocol); - - transportProvider.connect(_conn,address,port, sslContext, sslOptions, this); - - - try - { - _conn.open(); - } - catch(RuntimeException e) - { - transportProvider.close(); - } - - } - - private TransportProvider getTransportProvider(final String protocol) throws ConnectionException - { - TCPTransportProviderFactory tcpTransportProviderFactory = new TCPTransportProviderFactory(); - if(tcpTransportProviderFactory.getSupportedTransports().contains(protocol)) - { - return tcpTransportProviderFactory.getProvider(protocol); - } - - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader); - - for(TransportProviderFactory tpf : providerFactories) - { - if(tpf.getSupportedTransports().contains(protocol)) - { - return tpf.getProvider(protocol); - } - } - - throw new ConnectionException("Unknown protocol: " + protocol); - } - - private Connection(ConnectionEndpoint endpoint) - { - _conn = endpoint; - } - - - public Session createSession() throws ConnectionException - { - checkNotClosed(); - Session session = new Session(this,String.valueOf(_sessionCount++)); - return session; - } - - void checkNotClosed() throws ConnectionClosedException - { - if(getEndpoint().isClosed()) - { - Error remoteError = getEndpoint().getRemoteError(); - if(remoteError == null) - { - remoteError = new Error(); - remoteError.setDescription("Connection closed for unknown reason"); - - } - throw new ConnectionClosedException(remoteError); - } - } - - public ConnectionEndpoint getEndpoint() - { - return _conn; - } - - public void awaitOpen() throws TimeoutException, InterruptedException - { - getEndpoint().waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return getEndpoint().isOpen() || getEndpoint().isClosed(); - } - }); - - } - - public void close() throws ConnectionErrorException - { - _conn.close(); - - try - { - _conn.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _conn.closedForInput(); - } - }); - } - catch (InterruptedException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Interrupted while waiting for connection closure"); - } - catch (TimeoutException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Timed out while waiting for connection closure"); - } - if(_conn.getRemoteError() != null) - { - throw new ConnectionErrorException(_conn.getRemoteError()); - } - - } - - /** - * Set the connection error task that will be used as a callback for any socket read/write errors. - * - * @param connectionErrorTask connection error task - */ - public void setConnectionErrorTask(Runnable connectionErrorTask) - { - _connectionErrorTask = connectionErrorTask; - } - - /** - * Return the connection error for any socket read/write error that has occurred - * - * @return connection error - */ - public Error getConnectionError() - { - return _socketError; - } - - @Override - public void handleException(Exception exception) - { - Error socketError = new Error(); - socketError.setDescription(exception.getClass() + ": " + exception.getMessage()); - socketError.setCondition(ConnectionError.SOCKET_ERROR); - _socketError = socketError; - if(_connectionErrorTask != null) - { - Thread thread = new Thread(_connectionErrorTask); - thread.start(); - } - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java deleted file mode 100644 index a434a4aaaf..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionClosedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -public class ConnectionClosedException extends ConnectionErrorException -{ - - public ConnectionClosedException(org.apache.qpid.amqp_1_0.type.transport.Error remoteError) - { - super(remoteError); - } - -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java deleted file mode 100644 index 82f29ea4b1..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.ErrorCondition; -import org.apache.qpid.amqp_1_0.type.transport.Error; - -public class ConnectionErrorException extends ConnectionException -{ - protected final Error _remoteError; - - public ConnectionErrorException(ErrorCondition condition,final String description) - { - this(new Error(condition, description)); - } - - public ConnectionErrorException(Error remoteError) - { - super(remoteError.getDescription() == null ? remoteError.toString() : remoteError.getDescription()); - _remoteError = remoteError; - } - - public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError() - { - return _remoteError; - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java deleted file mode 100644 index 569c1f129d..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -public class ConnectionException extends Exception -{ - protected ConnectionException(final String message) - { - super(message); - } - - public ConnectionException(Throwable cause) - { - super(cause); - } - - ConnectionException() - { - - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java deleted file mode 100644 index 45b00255f2..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.transport.Error; - -public class LinkDetachedException extends Exception -{ - private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError; - - public LinkDetachedException(Error remoteError) - { - super(); - _remoteError = remoteError; - } - - public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError() - { - return _remoteError; - } - -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java deleted file mode 100644 index c4f9783c89..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Footer; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -public class Message -{ - - private static final Map<Class<? extends Section>, Collection<Class<? extends Section>>> VALID_NEXT_SECTIONS = new HashMap<>(); - - static - { - VALID_NEXT_SECTIONS.put(null, Arrays.asList(Header.class, - DeliveryAnnotations.class, - MessageAnnotations.class, - Properties.class, - ApplicationProperties.class, - AmqpValue.class, - AmqpSequence.class, - Data.class)); - - VALID_NEXT_SECTIONS.put(Header.class, Arrays.asList(DeliveryAnnotations.class, - MessageAnnotations.class, - Properties.class, - ApplicationProperties.class, - AmqpValue.class, - AmqpSequence.class, - Data.class)); - - VALID_NEXT_SECTIONS.put(DeliveryAnnotations.class, Arrays.asList(MessageAnnotations.class, - Properties.class, - ApplicationProperties.class, - AmqpValue.class, - AmqpSequence.class, - Data.class)); - - VALID_NEXT_SECTIONS.put(MessageAnnotations.class, Arrays.asList(Properties.class, - ApplicationProperties.class, - AmqpValue.class, - AmqpSequence.class, - Data.class)); - - VALID_NEXT_SECTIONS.put(Properties.class, Arrays.asList(ApplicationProperties.class, - AmqpValue.class, - AmqpSequence.class, - Data.class)); - - - VALID_NEXT_SECTIONS.put(ApplicationProperties.class, Arrays.asList(AmqpValue.class, - AmqpSequence.class, - Data.class)); - - VALID_NEXT_SECTIONS.put(AmqpValue.class, Arrays.<Class<? extends Section>>asList(Footer.class, null)); - - VALID_NEXT_SECTIONS.put(AmqpSequence.class, Arrays.asList(AmqpSequence.class, - Footer.class, null)); - - VALID_NEXT_SECTIONS.put(Data.class, Arrays.asList(Data.class, Footer.class, null)); - - VALID_NEXT_SECTIONS.put(Footer.class, Collections.<Class<? extends Section>>singletonList(null)); - - - } - - - private Binary _deliveryTag; - private List<Section> _payload = new ArrayList<Section>(); - private Boolean _resume; - private boolean _settled; - private DeliveryState _deliveryState; - private Receiver _receiver; - - - public Message() - { - } - - public Message(Collection<Section> sections) - { - this(sections, true); - } - - public Message(Collection<Section> sections, boolean validate) - { - _payload.addAll(validate ? validateOrReorder(sections) : sections); - } - - public Message(Section section) - { - this(Collections.singletonList(section)); - } - - public Message(String message) - { - this(new AmqpValue(message)); - } - - - private static Collection<Section> validateOrReorder(final Collection<Section> providedSections) - { - Collection<Section> validatedSections; - if(providedSections == null) - { - validatedSections = Collections.emptyList(); - } - else if(isValidOrder(providedSections)) - { - validatedSections = providedSections; - } - else - { - validatedSections = reorderSections(providedSections); - } - return validatedSections; - } - - private static Collection<Section> reorderSections(final Collection<Section> providedSections) - { - Collection<Section> validSections = new ArrayList<>(); - List<Section> originalSection = new ArrayList<>(providedSections); - validSections.addAll(getAndRemoveSections(Header.class, originalSection, false)); - validSections.addAll(getAndRemoveSections(DeliveryAnnotations.class, originalSection, false)); - validSections.addAll(getAndRemoveSections(MessageAnnotations.class, originalSection, false)); - validSections.addAll(getAndRemoveSections(Properties.class, originalSection, false)); - validSections.addAll(getAndRemoveSections(ApplicationProperties.class, originalSection, false)); - - final List<AmqpValue> valueSections = getAndRemoveSections(AmqpValue.class, originalSection, false); - final List<AmqpSequence> sequenceSections = getAndRemoveSections(AmqpSequence.class, originalSection, true); - final List<Data> dataSections = getAndRemoveSections(Data.class, originalSection, true); - - if(valueSections.isEmpty() && sequenceSections.isEmpty() && dataSections.isEmpty()) - { - throw new IllegalArgumentException("Message must contain one of Data, AmqpValue or AmqpSequence"); - } - if((!valueSections.isEmpty() && (!sequenceSections.isEmpty() || !dataSections.isEmpty())) - || (!sequenceSections.isEmpty() && !dataSections.isEmpty())) - { - throw new IllegalArgumentException("Only one type of content Data, AmqpValue or AmqpSequence can be used"); - } - validSections.addAll(valueSections); - validSections.addAll(sequenceSections); - validSections.addAll(dataSections); - - validSections.addAll(getAndRemoveSections(Footer.class, originalSection, false)); - - if(!originalSection.isEmpty()) - { - throw new IllegalArgumentException("Invalid section type: " + originalSection.get(0).getClass().getName()); - } - return validSections; - } - - private static <T extends Section> List<T> getAndRemoveSections(Class<T> clazz, - List<Section> sections, - boolean allowMultiple) - { - List<T> desiredSections = new ArrayList<>(); - ListIterator<Section> iterator = sections.listIterator(); - while(iterator.hasNext()) - { - Section s = iterator.next(); - if(s.getClass() == clazz) - { - desiredSections.add((T)s); - iterator.remove(); - } - } - if(desiredSections.size() > 1 && !allowMultiple) - { - throw new IllegalArgumentException("Multiple " + clazz.getSimpleName() + " sections are not allowed"); - } - return desiredSections; - } - - private static boolean isValidOrder(final Collection<Section> providedSections) - { - Class<? extends Section> previousSection = null; - final Iterator<? extends Section> it = providedSections.iterator(); - while(it.hasNext()) - { - Collection<Class<? extends Section>> validSections = VALID_NEXT_SECTIONS.get(previousSection); - Section next = it.next(); - Class<? extends Section> sectionClass = next.getClass(); - if(validSections == null || !validSections.contains(sectionClass)) - { - return false; - } - else - { - previousSection = sectionClass; - } - } - Collection<Class<? extends Section>> validSections = VALID_NEXT_SECTIONS.get(previousSection); - return validSections != null && validSections.contains(null); - } - - - - public Binary getDeliveryTag() - { - return _deliveryTag; - } - - public void setDeliveryTag(Binary deliveryTag) - { - _deliveryTag = deliveryTag; - } - - public List<Section> getPayload() - { - return Collections.unmodifiableList(_payload); - } - - private <T extends Section> T getSection(Class<T> clazz) - { - for(Section s : _payload) - { - if(clazz.isAssignableFrom(s.getClass())) - { - return (T) s; - } - } - return null; - } - - public ApplicationProperties getApplicationProperties() - { - return getSection(ApplicationProperties.class); - } - - public Properties getProperties() - { - return getSection(Properties.class); - } - - public Header getHeader() - { - return getSection(Header.class); - } - - - public void setResume(final Boolean resume) - { - _resume = resume; - } - - public boolean isResume() - { - return Boolean.TRUE.equals(_resume); - } - - public void setDeliveryState(DeliveryState state) - { - _deliveryState = state; - } - - public DeliveryState getDeliveryState() - { - return _deliveryState; - } - - public void setSettled(boolean settled) - { - _settled = settled; - } - - public boolean getSettled() - { - return _settled; - } - - public void setReceiver(final Receiver receiver) - { - _receiver = receiver; - } - - public Receiver getReceiver() - { - return _receiver; - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java deleted file mode 100644 index 5d4374fec5..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ /dev/null @@ -1,670 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeoutException; - -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.Predicate; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.ErrorCondition; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; - -public class Receiver implements DeliveryStateHandler -{ - private static final ErrorCondition UNKNOWN_ERROR_CONDITION = new ErrorCondition() - { - @Override - public Symbol getValue() - { - return Symbol.valueOf("Unknown"); - } - - @Override - public String toString() - { - return getValue().toString(); - } - }; - private ReceivingLinkEndpoint _endpoint; - private int _id; - private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100); - private Session _session; - - private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>(); - private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>(); - private MessageArrivalListener _messageArrivalListener; - private org.apache.qpid.amqp_1_0.type.transport.Error _error; - private Runnable _remoteErrorTask; - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode) throws ConnectionErrorException - { - this(session, linkName, target, source, ackMode, false); - } - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode, - boolean isDurable) throws ConnectionErrorException - { - this(session,linkName,target,source,ackMode,isDurable,null); - } - - public Receiver(final Session session, - final String linkName, - final Target target, - final Source source, - final AcknowledgeMode ackMode, - final boolean isDurable, - final Map<Binary,Outcome> unsettled) throws ConnectionErrorException - { - - session.getConnection().checkNotClosed(); - _session = session; - if(isDurable) - { - source.setDurable(TerminusDurability.UNSETTLED_STATE); - source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - } - else if(source != null) - { - source.setDurable(TerminusDurability.NONE); - source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); - } - _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source, - UnsignedInteger.ZERO); - - _endpoint.setDeliveryStateHandler(this); - - switch(ackMode) - { - case ALO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case EO: - _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - - _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener() - { - @Override public void messageTransfer(final Transfer xfr) - { - _prefetchQueue.add(xfr); - postPrefetchAction(); - } - - @Override - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - _error = detach.getError(); - if(detach.getError()!=null) - { - remoteError(); - } - super.remoteDetached(endpoint, detach); - } - }); - - _endpoint.setLocalUnsettled(unsettled); - _endpoint.attach(); - - try - { - _endpoint.waitUntil(new Predicate() - { - - @Override - public boolean isSatisfied() - { - return _endpoint.isAttached() || _endpoint.isDetached(); - } - }); - } - catch (TimeoutException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for attach"); - } - catch (InterruptedException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for attach"); - } - - if(_endpoint.getSource() == null) - { - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.isDetached(); - } - }); - } - catch (TimeoutException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for detach following failed attach"); - } - catch (InterruptedException e) - { - throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for detach following failed attach"); - } - - Error error = getError() == null - ? new Error(UNKNOWN_ERROR_CONDITION, "Unknown") - : getError(); - - - ErrorCondition condition = error.getCondition() == null ? UNKNOWN_ERROR_CONDITION : error.getCondition(); - - throw new ConnectionErrorException(condition, - error.getDescription() == null - ? "AMQP error: '" + condition.toString() - + "' when attempting to create a receiver" - + (source != null ? " from: '" + source.getAddress() +"'" : "") - : error.getDescription()); - } - } - - private void remoteError() - { - if(_remoteErrorTask != null) - { - Thread thread = new Thread(_remoteErrorTask); - thread.start(); - } - } - - private void postPrefetchAction() - { - if(_messageArrivalListener != null) - { - _messageArrivalListener.messageArrived(this); - } - } - - public void setCredit(UnsignedInteger credit, boolean window) - { - _endpoint.setLinkCredit(credit); - _endpoint.setCreditWindow(window); - - } - - - public String getAddress() - { - return ((Source)_endpoint.getSource()).getAddress(); - } - - public Map getFilter() - { - return ((Source)_endpoint.getSource()).getFilter(); - } - - public Message receive() - { - return receive(-1L); - } - - public Message receive(boolean wait) - { - return receive(wait ? -1L : 0L); - } - - // 0 means no wait, -1 wait forever - public Message receive(long wait) - { - Message m = null; - Transfer xfr; - long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L; - - while((xfr = receiveFromPrefetch(wait)) != null ) - { - - if(!Boolean.TRUE.equals(xfr.getAborted())) - { - Binary deliveryTag = xfr.getDeliveryTag(); - Boolean resume = xfr.getResume(); - - List<Section> sections = new ArrayList<Section>(); - List<ByteBuffer> payloads = new ArrayList<ByteBuffer>(); - int totalSize = 0; - - boolean hasMore; - do - { - hasMore = Boolean.TRUE.equals(xfr.getMore()); - - ByteBuffer buf = xfr.getPayload(); - - if(buf != null) - { - - totalSize += buf.remaining(); - - payloads.add(buf); - } - if(hasMore) - { - xfr = receiveFromPrefetch(-1l); - if(xfr== null) - { - // TODO - this is wrong!!!! - System.out.println("eeek"); - } - } - } - while(hasMore && !Boolean.TRUE.equals(xfr.getAborted())); - - if(!Boolean.TRUE.equals(xfr.getAborted())) - { - ByteBuffer allPayload = ByteBuffer.allocate(totalSize); - for(ByteBuffer payload : payloads) - { - allPayload.put(payload); - } - allPayload.flip(); - SectionDecoder decoder = _session.getSectionDecoder(); - - try - { - sections = decoder.parseAll(allPayload); - } - catch (AmqpErrorException e) - { - // todo - throw a sensible error - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - m = new Message(sections, false); - m.setDeliveryTag(deliveryTag); - m.setResume(resume); - m.setReceiver(this); - break; - } - } - - if(wait > 0L) - { - wait = endTime - System.currentTimeMillis(); - if(wait <=0L) - { - break; - } - } - } - - - return m; - - } - - private Transfer receiveFromPrefetch(long wait) - { - long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L); - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - Transfer xfr; - while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached() - && wait != 0) - { - try - { - if(wait>0L) - { - lock.wait(wait); - } - else if(wait<0L) - { - lock.wait(); - } - } - catch (InterruptedException e) - { - return null; - } - if(wait > 0L) - { - wait = endTime - System.currentTimeMillis(); - if(wait <= 0L) - { - break; - } - } - - } - if(xfr != null) - { - _prefetchQueue.poll(); - - } - - return xfr; - } - - } - - - public void release(final Message m) - { - release(m.getDeliveryTag()); - } - - public void release(Binary deliveryTag) - { - update(new Released(), deliveryTag, null, null); - } - - - public void modified(Binary tag) - { - final Modified outcome = new Modified(); - outcome.setDeliveryFailed(true); - - update(outcome, tag, null, null); - } - - public void acknowledge(final Message m) - { - acknowledge(m.getDeliveryTag()); - } - - public void acknowledge(final Message m, SettledAction a) - { - acknowledge(m.getDeliveryTag(), a); - } - - - public void acknowledge(final Message m, Transaction txn) - { - acknowledge(m.getDeliveryTag(), txn); - } - - - public void acknowledge(final Binary deliveryTag) - { - acknowledge(deliveryTag, null, null); - } - - - public void acknowledge(final Binary deliveryTag, SettledAction a) - { - acknowledge(deliveryTag, null, a); - } - - public void acknowledge(final Binary deliveryTag, final Transaction txn) - { - acknowledge(deliveryTag, txn, null); - } - - public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action) - { - update(new Accepted(), deliveryTag, txn, action); - } - - public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action) - { - - DeliveryState state; - if(txn != null) - { - TransactionalState txnState = new TransactionalState(); - txnState.setOutcome(outcome); - txnState.setTxnId(txn.getTxnId()); - state = txnState; - } - else - { - state = (DeliveryState) outcome; - } - boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); - - if(!(settled || action == null)) - { - _unsettledMap.put(deliveryTag, action); - } - - _endpoint.updateDisposition(deliveryTag,state, settled); - } - - public Error getError() - { - return _error; - } - - public void acknowledgeAll(Message m) - { - acknowledgeAll(m.getDeliveryTag()); - } - - public void acknowledgeAll(Binary deliveryTag) - { - acknowledgeAll(deliveryTag, null, null); - } - - public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action) - { - updateAll(new Accepted(), deliveryTag, txn, action); - } - - public void updateAll(Outcome outcome, Binary deliveryTag) - { - updateAll(outcome, deliveryTag, null, null); - } - - public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action) - { - DeliveryState state; - - if(txn != null) - { - TransactionalState txnState = new TransactionalState(); - txnState.setOutcome(outcome); - txnState.setTxnId(txn.getTxnId()); - state = txnState; - } - else - { - state = (DeliveryState) outcome; - } - boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode()); - - if(!(settled || action == null)) - { - _unsettledMap.put(deliveryTag, action); - } - _endpoint.updateAllDisposition(deliveryTag, state, settled); - } - - - - public void close() - { - _endpoint.setTarget(null); - _endpoint.close(); - Message msg; - while((msg = receive(0l)) != null) - { - release(msg); - } - _session.removeReceiver(this); - - } - - - public void detach() - { - _endpoint.setTarget(null); - _endpoint.detach(); - Message msg; - while((msg = receive(0l)) != null) - { - release(msg); - } - - } - - public void drain() - { - _endpoint.drain(); - } - - /** - * Waits for the receiver to drain or a message to be available to be received. - * @return true if the receiver has been drained. - */ - public boolean drainWait() - { - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - try - { - while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) - { - lock.wait(); - } - } - catch (InterruptedException e) - { - } - } - return _prefetchQueue.peek()==null && _endpoint.isDrained(); - } - - /** - * Clears the receiver drain so that message delivery can resume. - */ - public void clearDrain() - { - _endpoint.clearDrain(); - } - - public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) - { - _endpoint.setLinkCredit(credit); - _endpoint.setTransactionId(txn == null ? null : txn.getTxnId()); - _endpoint.setCreditWindow(false); - - } - - public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) - { - if(Boolean.TRUE.equals(settled)) - { - SettledAction action = _unsettledMap.remove(deliveryTag); - if(action != null) - { - action.onSettled(deliveryTag); - } - } - } - - public Map<Binary, Outcome> getRemoteUnsettled() - { - return _endpoint.getInitialUnsettledMap(); - } - - - public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener) - { - synchronized(_endpoint.getLock()) - { - _messageArrivalListener = messageArrivalListener; - int prefetchSize = _prefetchQueue.size(); - for(int i = 0; i < prefetchSize; i++) - { - postPrefetchAction(); - } - } - } - - public Session getSession() - { - return _session; - } - - public org.apache.qpid.amqp_1_0.type.Source getSource() - { - return _endpoint.getSource(); - } - - public static interface SettledAction - { - public void onSettled(Binary deliveryTag); - } - - - public interface MessageArrivalListener - { - void messageArrived(Receiver receiver); - } - - public void setRemoteErrorListener(Runnable listener) - { - _remoteErrorTask = listener; - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java deleted file mode 100644 index 1558b2043b..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLOptions.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -public class SSLOptions -{ - private final List<String> _enabledProtocols; - private final List<String> _disabledProtocols; - - public SSLOptions(String enabledProtocols, String disabledProtocols) - { - if(enabledProtocols == null) - { - enabledProtocols = System.getProperty("qpid.ssl.enabledProtocols"); - } - - if(disabledProtocols == null) - { - disabledProtocols = System.getProperty("qpid.ssl.disabledProtocols", SSLUtil.SSLV3_PROTOCOL); - } - - if(enabledProtocols == null) - { - _enabledProtocols = null; - } - else - { - _enabledProtocols = Collections.unmodifiableList(Arrays.asList(enabledProtocols.split(","))); - } - - if(disabledProtocols == null) - { - _disabledProtocols = null; - } - else - { - _disabledProtocols = Collections.unmodifiableList(Arrays.asList(disabledProtocols.split(","))); - } - } - - public SSLOptions(final List<String> enabledProtocols, final List<String> disabledProtocols) - { - this._enabledProtocols = enabledProtocols == null ? Collections.<String>emptyList() : Collections.unmodifiableList(new ArrayList<>(enabledProtocols)); - this._disabledProtocols = disabledProtocols == null ? Collections.<String>emptyList() : Collections.unmodifiableList(new ArrayList<>(disabledProtocols)); - } - - public List<String> getEnabledProtocols() - { - return _enabledProtocols; - } - - public List<String> getDisabledProtocols() - { - return _disabledProtocols; - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java deleted file mode 100644 index 7bcf796fa9..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.Socket; -import java.security.GeneralSecurityException; -import java.security.KeyStore; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.Principal; -import java.security.PrivateKey; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.logging.Logger; - -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509ExtendedKeyManager; - -public class SSLUtil -{ - public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS"; - public static final String SSLV3_PROTOCOL = "SSLv3"; - - - private static final Logger LOGGER = Logger.getLogger(SSLUtil.class.getName()); - - - public static SSLContext buildSslContext(final String certAlias, - final String keyStorePath, - final String keyStoreType, - final String keyStorePassword, - final String keyManagerFactoryAlgorithm, - final String trustStorePath, - final String trustStorePassword, - final String trustStoreType, - final String trustManagerFactoryAlgorithm, - final String sslProtocol, - final String sslProvider) throws GeneralSecurityException, IOException - { - - - SSLContext sslContext = getSslContext(sslProtocol, sslProvider); - - final TrustManager[] trustManagers; - final KeyManager[] keyManagers; - - if (trustStorePath != null) - { - final KeyStore ts = getInitializedKeyStore(trustStorePath, trustStorePassword, trustStoreType); - final TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustManagerFactoryAlgorithm); - - tmf.init(ts); - - trustManagers = tmf.getTrustManagers(); - } - else - { - trustManagers = null; - } - - if (keyStorePath != null) - { - if (certAlias != null) - { - keyManagers = new KeyManager[] { new QpidClientX509KeyManager( - certAlias, keyStorePath, keyStoreType, keyStorePassword, - keyManagerFactoryAlgorithm) }; - } - else - { - final KeyStore ks = SSLUtil.getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType); - - char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray(); - // Set up key manager factory to use our key store - final KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); - kmf.init(ks, keyStoreCharPassword); - keyManagers = kmf.getKeyManagers(); - } - } - else - { - keyManagers = null; - } - - - sslContext.init(keyManagers, trustManagers, null); - - return sslContext; - } - - private static SSLContext getSslContext(final String sslProtocol, - final String sslProvider) throws NoSuchAlgorithmException - { - - final String sslProviderName = sslProvider != null ? sslProvider : System.getProperty("qpid.ssl.contextProvider"); - final String sslProtocolName = sslProtocol != null ? sslProtocol : System.getProperty("qpid.ssl.contextProtocol", TRANSPORT_LAYER_SECURITY_CODE); - - SSLContext sslContext = null; - if(sslProviderName != null && sslProtocolName != null) - { - try - { - sslContext = SSLContext.getInstance(sslProtocolName, sslProviderName); - } - catch(NoSuchProviderException e) - { - LOGGER.info("Unknown SSL Context Provider '"+ sslProviderName + "' will use the default"); - } - catch (NoSuchAlgorithmException e) - { - LOGGER.info("Unknown SSL protocol '" + sslProtocolName - + "' when using the provider '" + sslProviderName + "' will use the default provider"); - } - } - if(sslContext == null && sslProtocolName != null) - { - try - { - sslContext = SSLContext.getInstance(sslProtocolName); - } - catch(NoSuchAlgorithmException e) - { - LOGGER.info("Unknown SSL protocol '" + sslProtocolName + - "' will use '"+TRANSPORT_LAYER_SECURITY_CODE+"'"); - } - } - if(sslContext == null) - { - sslContext = SSLContext.getInstance(TRANSPORT_LAYER_SECURITY_CODE); - } - return sslContext; - } - - public static X509Certificate[] getClientCertificates(final String alias, - final String keyStorePath, - final String keyStorePassword, - final String keyStoreType, - final String keyManagerFactoryAlgorithm) - throws GeneralSecurityException, IOException - { - return (new QpidClientX509KeyManager(alias,keyStorePath,keyStoreType,keyStorePassword,keyManagerFactoryAlgorithm)).getCertificateChain(alias); - } - - public static KeyStore getInitializedKeyStore(String storePath, String storePassword, String keyStoreType) throws GeneralSecurityException, IOException - { - KeyStore ks = KeyStore.getInstance(keyStoreType); - InputStream in = null; - try - { - File f = new File(storePath); - if (f.exists()) - { - in = new FileInputStream(f); - } - else - { - in = Thread.currentThread().getContextClassLoader().getResourceAsStream(storePath); - } - if (in == null && !"PKCS11".equalsIgnoreCase(keyStoreType)) // PKCS11 will not require an explicit path - { - throw new IOException("Unable to load keystore resource: " + storePath); - } - - char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray(); - - ks.load(in, storeCharPassword); - } - finally - { - if (in != null) - { - //noinspection EmptyCatchBlock - try - { - in.close(); - } - catch (IOException ignored) - { - } - } - } - return ks; - } - - public static class QpidClientX509KeyManager extends X509ExtendedKeyManager - { - - private X509ExtendedKeyManager delegate; - private String alias; - - public QpidClientX509KeyManager(String alias, String keyStorePath, String keyStoreType, - String keyStorePassword, String keyManagerFactoryAlgorithmName) throws - GeneralSecurityException, - IOException - { - this.alias = alias; - KeyStore ks = getInitializedKeyStore(keyStorePath, keyStorePassword, keyStoreType); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName); - kmf.init(ks, keyStorePassword.toCharArray()); - this.delegate = (X509ExtendedKeyManager) kmf.getKeyManagers()[0]; - } - - public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) - { - return alias; - } - - public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) - { - return delegate.chooseServerAlias(keyType, issuers, socket); - } - - public X509Certificate[] getCertificateChain(String alias) - { - return delegate.getCertificateChain(alias); - } - - public String[] getClientAliases(String keyType, Principal[] issuers) - { - return new String[]{alias}; - } - - public PrivateKey getPrivateKey(String alias) - { - return delegate.getPrivateKey(alias); - } - - public String[] getServerAliases(String keyType, Principal[] issuers) - { - return delegate.getServerAliases(keyType, issuers); - } - - public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) - { - return alias; - } - - public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) - { - return delegate.chooseEngineServerAlias(keyType, issuers, engine); - } - } - - public static void removeSSLv3Support(final SSLSocket socket) - { - List<String> enabledProtocols = Arrays.asList(socket.getEnabledProtocols()); - if(enabledProtocols.contains(SSLV3_PROTOCOL)) - { - List<String> allowedProtocols = new ArrayList<>(enabledProtocols); - allowedProtocols.remove(SSLV3_PROTOCOL); - socket.setEnabledProtocols(allowedProtocols.toArray(new String[allowedProtocols.size()])); - } - } - -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java deleted file mode 100644 index 2b76344085..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.Predicate; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Source; -import org.apache.qpid.amqp_1_0.type.Target; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; - -public class Sender implements DeliveryStateHandler -{ - private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l; - private static final long DEFAULT_CREDIT_TIMEOUT = 30000l; - - private SendingLinkEndpoint _endpoint; - private int _id; - private Session _session; - private int _windowSize; - private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>()); - private boolean _closed; - private Error _error; - private Runnable _remoteErrorTask; - private Outcome _defaultOutcome; - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, false); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - boolean synchronous) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window) throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO); - } - - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window) throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, target, source, window, AcknowledgeMode.ALO); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, mode, null); - } - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window, AcknowledgeMode mode) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, target, source, window, mode, null); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled); - } - - public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr, - int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled) - throws SenderCreationException, ConnectionClosedException - { - this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); - } - - protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source) - { - - } - - protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target) - { - - } - - private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) - { - org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); - source.setAddress(sourceAddr); - return source; - } - - private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable) - { - org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target(); - target.setAddress(targetAddr); - if(isDurable) - { - target.setDurable(TerminusDurability.UNSETTLED_STATE); - target.setExpiryPolicy(TerminusExpiryPolicy.NEVER); - } - return target; - } - - public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source, - int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled) - throws SenderCreationException, ConnectionClosedException - { - - _session = session; - _windowSize = window; - session.getConnection().checkNotClosed(); - configureSource(source); - configureTarget(target); - _endpoint = session.createSendingLinkEndpoint(linkName, target, source, mode, unsettled, this); - - synchronized(_endpoint.getLock()) - { - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.isAttached() || _endpoint.isDetached(); - } - }); - } - catch (TimeoutException e) - { - throw new SenderCreationException(e); - } - catch (InterruptedException e) - { - throw new SenderCreationException(e); - } - - if (session.getEndpoint().isEnded()) - { - throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress()); - } - if(_endpoint.getTarget()== null) - { - throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress()); - } - } - - _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener() - { - - @Override - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - _error = detach.getError(); - if(_error != null) - { - remoteError(); - } - super.remoteDetached(endpoint, detach); - } - }); - - _defaultOutcome = source.getDefaultOutcome(); - if(_defaultOutcome == null) - { - if(source.getOutcomes() == null || source.getOutcomes().length == 0) - { - _defaultOutcome = new Accepted(); - } - else if(source.getOutcomes().length == 1) - { - - final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession() - .getConnection() - .getDescribedTypeRegistry(); - - DescribedTypeConstructor constructor = describedTypeRegistry - .getConstructor(source.getOutcomes()[0]); - if(constructor != null) - { - Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST); - if(impliedOutcome instanceof Outcome) - { - _defaultOutcome = (Outcome) impliedOutcome; - } - } - - } - } - } - - public Source getSource() - { - return _endpoint.getSource(); - } - - public Target getTarget() - { - return _endpoint.getTarget(); - } - - public void send(Message message) throws LinkDetachedException, TimeoutException - { - send(message, null, null); - } - - public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException - { - send(message, null, action); - } - - public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException - { - send(message, txn, null); - } - - public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException - { - - List<Section> sections = message.getPayload(); - - Transfer xfr = new Transfer(); - - if(sections != null && !sections.isEmpty()) - { - SectionEncoder encoder = _session.getSectionEncoder(); - encoder.reset(); - - int sectionNumber = 0; - for(Section section : sections) - { - encoder.encodeObject(section); - } - - - Binary encoding = encoder.getEncoding(); - ByteBuffer payload = encoding.asByteBuffer(); - xfr.setPayload(payload); - } - if(message.getDeliveryTag() == null) - { - message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes())); - } - if(message.isResume()) - { - xfr.setResume(Boolean.TRUE); - } - if(message.getDeliveryState() != null) - { - xfr.setState(message.getDeliveryState()); - } - - xfr.setDeliveryTag(message.getDeliveryTag()); - //xfr.setSettled(_windowSize ==0); - if(txn != null) - { - xfr.setSettled(false); - TransactionalState deliveryState = new TransactionalState(); - deliveryState.setTxnId(txn.getTxnId()); - xfr.setState(deliveryState); - } - else - { - xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); - } - final Object lock = _endpoint.getLock(); - - synchronized(lock) - { - - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.hasCreditToSend() || _endpoint.isDetached(); - } - }, getCreditTimeout()); - } - catch (InterruptedException e) - { - throw new TimeoutException("Interrupted while waiting for credit"); - } - - if(_endpoint.isDetached()) - { - throw new LinkDetachedException(_error); - } - if(action != null) - { - _outcomeActions.put(message.getDeliveryTag(), action); - } - _endpoint.transfer(xfr); - } - - if(_windowSize != 0) - { - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.getUnsettledCount() < _windowSize; - } - }, getUnsettledTimeout()); - } - catch (InterruptedException e) - { - throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending"); - } - - } - - - } - - private long getCreditTimeout() - { - return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout(); - } - - public void close() throws SenderClosingException - { - boolean unsettledDeliveries = false; - - if(_windowSize != 0) - { - long timeout = getUnsettledTimeout(); - - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.getUnsettledCount() == 0; - } - }, timeout); - } - catch (InterruptedException e) - { - unsettledDeliveries = true; - } - catch (TimeoutException e) - { - unsettledDeliveries = true; - } - - } - _session.removeSender(this); - _endpoint.setSource(null); - _endpoint.close(); - _closed = true; - - try - { - _endpoint.waitUntil(new Predicate() - { - @Override - public boolean isSatisfied() - { - return _endpoint.isDetached(); - } - }); - } - catch (TimeoutException e) - { - throw new SenderClosingException("Timed out attempting to detach link", e); - } - catch (InterruptedException e) - { - throw new SenderClosingException("Interrupted while attempting to detach link", e); - } - if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0) - { - throw new SenderClosingException("Some messages may not have been received by the recipient"); - } - } - - private long getUnsettledTimeout() - { - long timeout = _endpoint.getSyncTimeout(); - - // give a generous timeout where there are unsettled messages - if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER) - { - timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER; - } - return timeout; - } - - public boolean isClosed() - { - return _closed; - } - - public void handle(Binary deliveryTag, DeliveryState state, Boolean settled) - { - OutcomeAction action; - if(state instanceof Outcome) - { - if((action = _outcomeActions.remove(deliveryTag)) != null) - { - - final Outcome outcome = (Outcome) state; - action.onOutcome(deliveryTag, (outcome == null && settled) ? _defaultOutcome : outcome); - } - if(!Boolean.TRUE.equals(settled)) - { - _endpoint.updateDisposition(deliveryTag, state, true); - } - } - else if(state instanceof TransactionalState) - { - if((action = _outcomeActions.remove(deliveryTag)) != null) - { - final Outcome outcome = ((TransactionalState) state).getOutcome(); - action.onOutcome(deliveryTag, outcome == null ? _defaultOutcome : outcome); - } - - } - else if(state == null && settled && (action = _outcomeActions.remove(deliveryTag)) != null) - { - action.onOutcome(deliveryTag, _defaultOutcome); - } - } - - public SendingLinkEndpoint getEndpoint() - { - return _endpoint; - } - - public Map<Binary, DeliveryState> getRemoteUnsettled() - { - return _endpoint.getInitialUnsettledMap(); - } - - public Session getSession() - { - return _session; - } - - - private void remoteError() - { - if(_remoteErrorTask != null) - { - Thread thread = new Thread(_remoteErrorTask); - thread.start(); - } - } - - - public void setRemoteErrorListener(Runnable listener) - { - _remoteErrorTask = listener; - } - - public Error getError() - { - return _error; - } - - public class SenderCreationException extends Exception - { - public SenderCreationException(Throwable e) - { - super(e); - } - - public SenderCreationException(String e) - { - super(e); - - } - } - - public class SenderClosingException extends Exception - { - public SenderClosingException(final String message, final Throwable cause) - { - super(message, cause); - } - - public SenderClosingException(Throwable e) - { - super(e); - } - - public SenderClosingException(final String message) - { - super(message); - } - } - - public static interface OutcomeAction - { - public void onOutcome(Binary deliveryTag, Outcome outcome); - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java deleted file mode 100644 index 2c3857a689..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; -import org.apache.qpid.amqp_1_0.transport.SessionState; -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public class Session -{ - private SessionEndpoint _endpoint; - private List<Receiver> _receivers = new ArrayList<Receiver>(); - private List<Sender> _senders = new ArrayList<Sender>(); - private SectionEncoder _sectionEncoder; - private SectionDecoder _sectionDecoder; - private TransactionController _sessionLocalTC; - private Connection _connection; - - public Session(final Connection connection, String name) throws ChannelsExhaustedException - { - _connection = connection; - _endpoint = connection.getEndpoint().createSession(name); - if(_endpoint == null) - { - throw new ChannelsExhaustedException("Cannot create session as all channels are in use"); - } - _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); - _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry()); - } - - - public Sender createSender(final String targetName) - throws Sender.SenderCreationException, ConnectionClosedException - { - - final String sourceName = UUID.randomUUID().toString(); - return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false); - - } - - - public Sender createSender(final String targetName, final SourceConfigurator configurator) - throws Sender.SenderCreationException, ConnectionClosedException - { - final String sourceName = UUID.randomUUID().toString(); - return createSender(sourceName, targetName, configurator); - } - - public Sender createSender(final String sourceName, final String targetName, final SourceConfigurator configurator) - throws Sender.SenderCreationException, ConnectionClosedException - { - - return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false) - { - @Override - protected void configureSource(final Source source) - { - configurator.configureSource(source); - } - }; - - } - - public Sender createSender(final String targetName, int window) - throws Sender.SenderCreationException, ConnectionClosedException - { - final String sourceName = UUID.randomUUID().toString(); - return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window); - - } - - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) - throws Sender.SenderCreationException, ConnectionClosedException - { - return createSender(targetName, window, mode, linkName, false, null); - } - - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, - boolean isDurable, Map<Binary, Outcome> unsettled) - throws Sender.SenderCreationException, ConnectionClosedException - { - return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName, - targetName, null, window, mode, isDurable, unsettled); - - } - - public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, - final Target target, - final Source source, - AcknowledgeMode mode, - Map<Binary, Outcome> unsettled, - final DeliveryStateHandler deliveryStateHandler) - { - SessionEndpoint endpoint = this.getEndpoint(); - synchronized(endpoint.getLock()) - { - SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source, target, - unsettled, deliveryStateHandler); - - switch(mode) - { - case ALO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - link.setSendingSettlementMode(SenderSettleMode.SETTLED); - break; - case EO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - - link.attach(); - return link; - } - } - - public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, UUID.randomUUID().toString(), null, AcknowledgeMode.ALO); - } - - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode) - throws ConnectionErrorException - { - return createReceiver(queue, UUID.randomUUID().toString(), null, mode); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName) - throws ConnectionErrorException - { - return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable) - throws ConnectionErrorException - { - return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable); - } - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable, - Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - return createReceiver(queue, (DistributionMode) null, mode, linkName, isDurable, filters, unsettled); - } - - public Receiver createReceiver(final String queue, String targetName, final AcknowledgeMode mode, String linkName, boolean isDurable, - Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - return createReceiver(queue, targetName, null, mode, linkName, isDurable, filters, unsettled); - } - - - public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, - boolean isDurable, Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - return createReceiver(queue, UUID.randomUUID().toString(), null, mode, linkName, isDurable, unsettled); - } - - - private synchronized Receiver createReceiver(final String sourceAddr, - final String targetAddr, - DistributionMode mode) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, targetAddr, mode, AcknowledgeMode.ALO); - } - - - private synchronized Receiver createReceiver(final String sourceAddr, - final String targetAddr, - DistributionMode mode, - final AcknowledgeMode ackMode) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, targetAddr, mode, ackMode, null); - } - - private synchronized Receiver createReceiver(final String sourceAddr, - final String targetAddr, - DistributionMode mode, - final AcknowledgeMode ackMode, - String linkName) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, false); - } - - private synchronized Receiver createReceiver(final String sourceAddr, - final String targetAddr, - DistributionMode mode, - final AcknowledgeMode ackMode, - String linkName, - boolean isDurable) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, targetAddr, mode, ackMode, linkName, isDurable, null); - } - - private synchronized Receiver createReceiver(final String sourceAddr, - final String targetAddr, - DistributionMode mode, - final AcknowledgeMode ackMode, - String linkName, - boolean isDurable, - Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled); - } - - public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable, - Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - return createReceiver(sourceAddr, UUID.randomUUID().toString(), mode, ackMode, linkName, isDurable, filters, unsettled); - } - - public synchronized Receiver createReceiver(final String sourceAddr, String targetAddr, DistributionMode mode, - final AcknowledgeMode ackMode, String linkName, boolean isDurable, - Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled) - throws ConnectionErrorException - { - - final Target target = new Target(); - target.setAddress(targetAddr); - final Source source = new Source(); - source.setAddress(sourceAddr); - source.setDistributionMode(mode); - source.setFilter(filters); - - if(linkName == null) - { - linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")"; - } - - final Receiver receiver = - new Receiver(this, linkName, - target, source, ackMode, isDurable, unsettled); - _receivers.add(receiver); - - return receiver; - - } - - public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.COPY); - } - - public synchronized Receiver createMovingReceiver(final String sourceAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE); - } - - public synchronized Receiver createMovingReceiver(final String sourceAddr, final String targetAddr) throws ConnectionErrorException - { - return createReceiver(sourceAddr, UUID.randomUUID().toString(), StdDistMode.MOVE); - } - - public Receiver createTemporaryQueueReceiver() throws AmqpErrorException, ConnectionErrorException - { - Source source = new Source(); - source.setDynamic(true); - - final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(), - source, AcknowledgeMode.ALO); - _receivers.add(receiver); - return receiver; - } - - public Sender createTemporaryQueueSender() throws Sender.SenderCreationException, ConnectionClosedException - { - Target target = new Target(); - target.setDynamic(true); - - final Sender sender; - sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target, - new Source(), 0, AcknowledgeMode.ALO); - _senders.add(sender); - return sender; - } - - - - public SessionEndpoint getEndpoint() - { - return _endpoint; - } - - public synchronized void close() - { - try - { - for(Sender sender : new ArrayList<Sender>(_senders)) - { - sender.close(); - } - for(Receiver receiver : new ArrayList<Receiver>(_receivers)) - { - receiver.detach(); - } - if(_sessionLocalTC != null) - { - _sessionLocalTC.close(); - } - _endpoint.end(); - } - catch (Sender.SenderClosingException e) - { -// TODO - e.printStackTrace(); - } - - //TODO - - } - - void removeSender(Sender sender) - { - _senders.remove(sender); - } - - void removeReceiver(Receiver receiver) - { - _receivers.remove(receiver); - } - - public SectionEncoder getSectionEncoder() - { - return _sectionEncoder; - } - - public SectionDecoder getSectionDecoder() - { - return _sectionDecoder; - } - - - public Transaction createSessionLocalTransaction() throws LinkDetachedException - { - TransactionController localController = getSessionLocalTransactionController(); - return localController.beginTransaction(); - } - - private TransactionController getSessionLocalTransactionController() - { - if(_sessionLocalTC == null) - { - _sessionLocalTC = createSessionLocalTransactionController(); - } - return _sessionLocalTC; - } - - private TransactionController createSessionLocalTransactionController() - { - String name = "txnControllerLink"; - SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN, - TxnCapability.MULTI_TXNS_PER_SSN); - tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - tcLinkEndpoint.attach(); - return new TransactionController(this, tcLinkEndpoint); - } - - - public Message receive() - { - while(getEndpoint().getState() == SessionState.ACTIVE) - { - synchronized (getEndpoint().getLock()) - { - try - { - for(Receiver r : _receivers) - { - Message m = r.receive(false); - if(m != null) - return m; - } - getEndpoint().getLock().wait(); - } - catch (InterruptedException e) - { - } - } - } - return null; - } - - public Connection getConnection() - { - return _connection; - } - - public void awaitActive() - { - synchronized(getEndpoint().getLock()) - { - while(!getEndpoint().isEnded() && !getEndpoint().isActive()) - { - try - { - getEndpoint().getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - - public static interface SourceConfigurator - { - public void configureSource(final Source source); - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java deleted file mode 100644 index e3a982d8a9..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvider.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; - -import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; -import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.SaslFrameBody; - -class TCPTransportProvider implements TransportProvider -{ - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - - private Socket _socket; - private final String _transport; - - // Defines read socket timeout in milliseconds. A value of 0 means that the socket - // read will block forever. Default value is set to 10000, which is 10 seconds. - private int _readTimeout = Integer.getInteger("qpid.connection_read_timeout", 10000); - - // Defines the max idle read timeout in milliseconds before the connection is closed down in - // the event of a SocketTimeoutException. A value of -1L will disable idle read timeout checking. - // Default value is set to -1L, which means disable idle read checks. - private long _readIdleTimeout = Long.getLong("qpid.connection_read_idle_timeout", -1L); - private final AtomicLong _threadNameIndex = new AtomicLong(); - - public TCPTransportProvider(final String transport) - { - _transport = transport; - } - - @Override - public void connect(final ConnectionEndpoint conn, - final String address, - final int port, - final SSLContext sslContext, - final SSLOptions sslOptions, final ExceptionHandler exceptionHandler) throws ConnectionException - { - try - { - if(sslContext != null) - { - final SSLSocketFactory socketFactory = sslContext.getSocketFactory(); - SSLSocket sslSocket = (SSLSocket) socketFactory.createSocket(address, port); - if(sslOptions == null) - { - SSLUtil.removeSSLv3Support(sslSocket); - } - else - { - final List<String> enabledProtocols = sslOptions.getEnabledProtocols(); - final List<String> disabledProtocols = sslOptions.getDisabledProtocols(); - - if(enabledProtocols != null && !enabledProtocols.isEmpty()) - { - final Set<String> supportedSuites = - new HashSet<>(Arrays.asList(sslSocket.getSupportedProtocols())); - supportedSuites.retainAll(enabledProtocols); - sslSocket.setEnabledProtocols(supportedSuites.toArray(new String[supportedSuites.size()])); - } - - if(disabledProtocols != null && !disabledProtocols.isEmpty()) - { - final Set<String> enabledSuites = new HashSet<>(Arrays.asList(sslSocket.getEnabledProtocols())); - enabledSuites.removeAll(disabledProtocols); - sslSocket.setEnabledProtocols(enabledSuites.toArray(new String[enabledSuites.size()])); - } - } - sslSocket.startHandshake(); - conn.setExternalPrincipal(sslSocket.getSession().getLocalPrincipal()); - _socket=sslSocket; - } - else - { - _socket = new Socket(address, port); - } - // set socket read timeout - _socket.setSoTimeout(_readTimeout); - - conn.setRemoteAddress(_socket.getRemoteSocketAddress()); - - ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(conn); - - ConnectionHandler.BytesSource src; - - if(conn.requiresSASL()) - { - ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(conn); - - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)3, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(saslOut.asFrameSource(),conn.getDescribedTypeRegistry()), - new ConnectionHandler.HeaderBytesSource(conn, (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry()) - ); - - conn.setSaslFrameOutput(saslOut); - } - else - { - src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(conn,(byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte)0, - (byte)1, - (byte)0, - (byte)0), - new ConnectionHandler.FrameToBytesSourceAdapter(out.asFrameSource(),conn.getDescribedTypeRegistry()) - ); - } - - - final OutputStream outputStream = _socket.getOutputStream(); - ConnectionHandler.BytesOutputHandler outputHandler = - new ConnectionHandler.BytesOutputHandler(outputStream, src, conn, exceptionHandler); - long threadIndex = _threadNameIndex.getAndIncrement(); - Thread outputThread = new Thread(outputHandler, "QpidConnectionOutputThread-"+threadIndex); - - outputThread.setDaemon(true); - outputThread.start(); - conn.setFrameOutputHandler(out); - - - final ConnectionHandler handler = new ConnectionHandler(conn); - final InputStream inputStream = _socket.getInputStream(); - - Thread inputThread = new Thread(new Runnable() - { - - public void run() - { - try - { - doRead(conn, handler, inputStream); - } - finally - { - if(conn.closedForInput() && conn.closedForOutput()) - { - close(); - } - } - } - },"QpidConnectionInputThread-"+threadIndex); - - inputThread.setDaemon(true); - inputThread.start(); - - } - catch (IOException e) - { - throw new ConnectionException(e); - } - } - - @Override - public void close() - { - try - { - _socket.close(); - } - catch (IOException e) - { - RAW_LOGGER.log(Level.WARNING, "Unexpected Error during TCPTransportProvider socket close", e); - } - } - - private void doRead(final ConnectionEndpoint conn, final ConnectionHandler handler, final InputStream inputStream) - { - byte[] buf = new byte[2<<15]; - - - try - { - int read; - boolean done = false; - long lastReadTime = System.currentTimeMillis(); - while(!handler.isDone()) - { - try - { - read = inputStream.read(buf); - if(read == -1) - { - break; - } - lastReadTime = System.currentTimeMillis(); - - ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); - while(bbuf.hasRemaining() && !handler.isDone()) - { - handler.parse(bbuf); - } - - } - catch(SocketTimeoutException e) - { - // Note that a SocketTimeoutException could only occur if _readTimeout > 0. - // Only perform idle read timeout checking if _readIdleTimeout is greater than -1 - if(_readIdleTimeout > -1 && (System.currentTimeMillis() - lastReadTime >= _readIdleTimeout)){ - // break out of while loop and close down connection - break; - } - } - } - if(!handler.isDone()) - { - conn.inputClosed(); - if(conn.getConnectionEventListener() != null) - { - conn.getConnectionEventListener().closeReceived(); - } - } - } - catch (IOException e) - { - conn.inputClosed(); - RAW_LOGGER.log(Level.INFO, "IO Error reading from connection", e); - } - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java deleted file mode 100644 index a29bbcd232..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.util.Arrays; -import java.util.Collection; - -public class TCPTransportProviderFactory implements TransportProviderFactory -{ - @Override - public Collection<String> getSupportedTransports() - { - return Arrays.asList("amqp","amqps"); - } - - @Override - public TransportProvider getProvider(final String transport) - { - return new TCPTransportProvider(transport); - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java deleted file mode 100644 index e67f9e2fce..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; - -public class Transaction -{ - private TransactionController _transactionController; - private Binary _txnId; - - Transaction(final TransactionController transactionController, Binary txnId) - { - _transactionController = transactionController; - _txnId = txnId; - } - - public void commit() throws LinkDetachedException - { - _transactionController.commit(this); - } - - public void rollback() throws LinkDetachedException - { - _transactionController.rollback(this); - } - - public Binary getTxnId() - { - return _txnId; - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java deleted file mode 100644 index 4a4cce1146..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.transaction.Declare; -import org.apache.qpid.amqp_1_0.type.transaction.Declared; -import org.apache.qpid.amqp_1_0.type.transaction.Discharge; -import org.apache.qpid.amqp_1_0.type.transport.*; -import org.apache.qpid.amqp_1_0.type.transport.Error; - - -public class TransactionController implements DeliveryStateHandler -{ - private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 0}); - private SendingLinkEndpoint _endpoint; - private Session _session; - private volatile DeliveryState _state; - private boolean _received; - private Error _error; - - public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint) - { - _session = session; - _endpoint = tcLinkEndpoint; - _endpoint.setDeliveryStateHandler(this); - _endpoint.setLinkEventListener(new SendingLinkListener() - { - @Override - public void flowStateChanged() - { - // ignore - } - - @Override - public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) - { - TransactionController.this.remoteDetached(detach); - } - }); - } - - public Transaction beginTransaction() throws LinkDetachedException - { - - - Binary txnId = declare(); - return new Transaction(this, txnId); - } - - private Binary declare() throws LinkDetachedException - { - SectionEncoder encoder = _session.getSectionEncoder(); - - - AmqpValue section = new AmqpValue(new Declare()); - - - Transfer transfer = new Transfer(); - transfer.setPayload(section.encode(encoder).asByteBuffer()); - transfer.setDeliveryTag(DELIVERY_TAG); - transfer.setSettled(Boolean.FALSE); - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - while(!_endpoint.hasCreditToSend()) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - - } - } - _state = null; - _received = false; - _endpoint.transfer(transfer); - - //TODO - rationalise sending of flows - // _endpoint.sendFlow(); - } - waitForResponse(); - - - return ((Declared) _state).getTxnId(); - } - - private void waitForResponse() throws LinkDetachedException - { - final Object lock = _endpoint.getLock(); - synchronized (lock) - { - while(!_received && !_endpoint.isDetached()) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - - } - } - } - if(!_received && _endpoint.isDetached()) - { - throw new LinkDetachedException(_error); - } - } - - private void remoteDetached(Detach detach) - { - final Object lock = _endpoint.getLock(); - synchronized (lock) - { - if (detach != null && detach.getError() != null) - { - _error = detach.getError(); - lock.notifyAll(); - } - } - } - - - public void commit(final Transaction transaction) throws LinkDetachedException - { - discharge(transaction.getTxnId(), false); - } - - public void rollback(final Transaction transaction) throws LinkDetachedException - { - discharge(transaction.getTxnId(), true); - } - - private void discharge(final Binary txnId, final boolean fail) throws LinkDetachedException - { - Discharge discharge = new Discharge(); - discharge.setTxnId(txnId); - discharge.setFail(fail); - SectionEncoder encoder = _session.getSectionEncoder(); - - - AmqpValue section = new AmqpValue(discharge); - - Transfer transfer = new Transfer(); - transfer.setPayload(section.encode(encoder).asByteBuffer()); - transfer.setDeliveryTag(DELIVERY_TAG); - transfer.setSettled(Boolean.FALSE); - - final Object lock = _endpoint.getLock(); - synchronized(lock) - { - while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - - } - } - if(_endpoint.isDetached()) - { - throw new LinkDetachedException(_error); - } - _state = null; - _received = false; - _endpoint.transfer(transfer); - - //TODO - rationalise sending of flows - // _endpoint.sendFlow(); - } - waitForResponse(); - - - } - - public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) - { - synchronized(this) - { - _state = state; - _received = true; - - if(!Boolean.TRUE.equals(settled)) - { - _endpoint.updateDisposition(deliveryTag, state, true); - } - - notifyAll(); - } - } - - public void close() - { - _endpoint.close(); - } -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java deleted file mode 100644 index e8ea53b451..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import javax.net.ssl.SSLContext; - -import org.apache.qpid.amqp_1_0.framing.ExceptionHandler; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; - -public interface TransportProvider -{ - void connect(ConnectionEndpoint conn, - String address, - int port, - SSLContext sslContext, - final SSLOptions sslOptions, - ExceptionHandler exceptionHandler) throws ConnectionException; - - void close(); -} diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java deleted file mode 100644 index 82999c5ccc..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.util.Collection; - -public interface TransportProviderFactory -{ - Collection<String> getSupportedTransports(); - TransportProvider getProvider(String transport); -} diff --git a/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory b/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory deleted file mode 100644 index ffde030b30..0000000000 --- a/qpid/java/amqp-1-0-client/src/main/resources/META-INF/services/org.apache.qpid.amqp_1_0.client.TransportProviderFactory +++ /dev/null @@ -1,19 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -org.apache.qpid.amqp_1_0.client.TCPTransportProviderFactory
\ No newline at end of file |
