From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- java/amqp-1-0-client/build.xml | 3 + java/amqp-1-0-client/example/build.xml | 28 ++ .../org/apache/qpid/amqp_1_0/client/Command.java | 43 ++ .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 407 ++++++++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 136 ++++++ .../apache/qpid/amqp_1_0/client/Filereceiver.java | 347 ++++++++++++++ .../apache/qpid/amqp_1_0/client/Filesender.java | 296 ++++++++++++ .../org/apache/qpid/amqp_1_0/client/Receive.java | 246 ++++++++++ .../org/apache/qpid/amqp_1_0/client/Request.java | 249 ++++++++++ .../org/apache/qpid/amqp_1_0/client/Respond.java | 347 ++++++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Send.java | 244 ++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Util.java | 529 +++++++++++++++++++++ java/amqp-1-0-client/resources/LICENSE | 204 ++++++++ java/amqp-1-0-client/resources/NOTICE | 5 + java/amqp-1-0-client/resources/README.txt | 7 + .../org/apache/qpid/amqp_1_0/client/Command.java | 43 -- .../apache/qpid/amqp_1_0/client/Connection.java | 115 +---- .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 407 ---------------- .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 136 ------ .../apache/qpid/amqp_1_0/client/Filereceiver.java | 347 -------------- .../apache/qpid/amqp_1_0/client/Filesender.java | 296 ------------ .../org/apache/qpid/amqp_1_0/client/ReadBytes.java | 77 --- .../org/apache/qpid/amqp_1_0/client/Receive.java | 246 ---------- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 35 +- .../org/apache/qpid/amqp_1_0/client/Request.java | 249 ---------- .../org/apache/qpid/amqp_1_0/client/Respond.java | 347 -------------- .../java/org/apache/qpid/amqp_1_0/client/Send.java | 244 ---------- .../org/apache/qpid/amqp_1_0/client/SendBytes.java | 331 ------------- .../java/org/apache/qpid/amqp_1_0/client/Util.java | 529 --------------------- 29 files changed, 3136 insertions(+), 3357 deletions(-) create mode 100644 java/amqp-1-0-client/example/build.xml create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java create mode 100644 java/amqp-1-0-client/resources/LICENSE create mode 100644 java/amqp-1-0-client/resources/NOTICE create mode 100644 java/amqp-1-0-client/resources/README.txt delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java (limited to 'java/amqp-1-0-client') diff --git a/java/amqp-1-0-client/build.xml b/java/amqp-1-0-client/build.xml index 031809a380..cd7654ed15 100644 --- a/java/amqp-1-0-client/build.xml +++ b/java/amqp-1-0-client/build.xml @@ -23,6 +23,9 @@ + + + diff --git a/java/amqp-1-0-client/example/build.xml b/java/amqp-1-0-client/example/build.xml new file mode 100644 index 0000000000..89bba729dd --- /dev/null +++ b/java/amqp-1-0-client/example/build.xml @@ -0,0 +1,28 @@ + + + + + + + + + diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java new file mode 100644 index 0000000000..3bb26744c4 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.lang.reflect.InvocationTargetException; + +public class Command +{ + public static void main(String[] args) throws + ClassNotFoundException, + NoSuchMethodException, + InvocationTargetException, + IllegalAccessException, + InstantiationException + { + String name = args[0]; + String[] cmdArgs = new String[args.length-1]; + System.arraycopy(args,1,cmdArgs,0,args.length-1); + name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); + Class clazz = (Class) Class.forName(name); + Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); + util.run(); + + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java new file mode 100644 index 0000000000..b58ce6bfe5 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -0,0 +1,407 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Demo extends Util +{ + private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; + private static final String OPCODE = "opcode"; + private static final String ACTION = "action"; + private static final String MESSAGE_ID = "message-id"; + private static final String VENDOR = "vendor"; + private static final String LOG = "log"; + private static final String RECEIVED = "received"; + private static final String TEST = "test"; + private static final String APACHE = "apache"; + private static final String SENT = "sent"; + private static final String LINK_REF = "link-ref"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String SASL_USER = "sasl-user"; + private static final String SASL_PASSWORD = "sasl-password"; + private static final String ROLE = "role"; + private static final String ADDRESS = "address"; + private static final String SENDER = "sender"; + private static final String SEND_MESSAGE = "send-message"; + private static final String ANNOUNCE = "announce"; + private static final String MESSAGE_VENDOR = "message-vendor"; + private static final String CREATE_LINK = "create-link"; + + public static void main(String[] args) + { + new Demo(args).run(); + } + + public Demo(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected boolean hasWindowSizeOption() + { + return false; + } + + public void run() + { + + try + { + + final String vendor = getArgs()[0]; + final String queue = "control"; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + + Receiver responseReceiver; + + responseReceiver = session.createTemporaryQueueReceiver(); + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + + Properties properties = new Properties(); + properties.setMessageId(java.util.UUID.randomUUID()); + properties.setReplyTo(responseReceiver.getAddress()); + + HashMap appPropMap = new HashMap(); + ApplicationProperties appProperties = new ApplicationProperties(appPropMap); + + appPropMap.put(OPCODE, ANNOUNCE); + appPropMap.put(VENDOR, vendor); + appPropMap.put(ADDRESS,responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { properties, appProperties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1); + + Map sendingLinks = new HashMap(); + Map receivingLinks = new HashMap(); + + + boolean done = false; + + while(!done) + { + boolean wait = true; + Message m = responseReceiver.receive(false); + if(m != null) + { + List
payload = m.getPayload(); + wait = false; + ApplicationProperties props = m.getApplicationProperties(); + Map map = props.getValue(); + String op = (String) map.get(OPCODE); + if("reset".equals(op)) + { + for(Sender sender : sendingLinks.values()) + { + try + { + sender.close(); + Session session1 = sender.getSession(); + session1.close(); + session1.getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + for(Receiver receiver : receivingLinks.values()) + { + try + { + receiver.close(); + receiver.getSession().close(); + receiver.getSession().getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + sendingLinks.clear(); + receivingLinks.clear(); + } + else if(CREATE_LINK.equals(op)) + { + Object linkRef = map.get(LINK_REF); + String host = (String) map.get(HOST); + Object o = map.get(PORT); + int port = Integer.parseInt(String.valueOf(o)); + String user = (String) map.get(SASL_USER); + String password = (String) map.get(SASL_PASSWORD); + String role = (String) map.get(ROLE); + String address = (String) map.get(ADDRESS); + System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); + try{ + + + Connection conn2 = new Connection(host, port, user, password, host); + Session session2 = conn2.createSession(); + if(sendingLinks.containsKey(linkRef)) + { + try + { + sendingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(receivingLinks.containsKey(linkRef)) + { + try + { + receivingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(SENDER.equals(role)) + { + + System.err.println("%%% Creating sender (" + linkRef + ")"); + Sender sender = session2.createSender(address); + sendingLinks.put(linkRef, sender); + } + else + { + + System.err.println("%%% Creating receiver (" + linkRef + ")"); + Receiver receiver2 = session2.createReceiver(address); + receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + receivingLinks.put(linkRef, receiver2); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + else if(SEND_MESSAGE.equals(op)) + { + Sender sender = sendingLinks.get(map.get(LINK_REF)); + Properties m2props = new Properties(); + Object messageId = map.get(MESSAGE_ID); + m2props.setMessageId(messageId); + Map m2propmap = new HashMap(); + m2propmap.put(OPCODE, TEST); + m2propmap.put(VENDOR, vendor); + ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); + Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); + sender.send(m2); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, SENT); + m3propmap.put(MESSAGE_ID, messageId); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, vendor); + + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+messageId))); + s.send(m3); + + } + + responseReceiver.acknowledge(m); + } + else + { + for(Map.Entry entry : receivingLinks.entrySet()) + { + m = entry.getValue().receive(false); + if(m != null) + { + wait = false; + + System.err.println("%%% Received message from " + entry.getKey()); + + Properties mp = m.getProperties(); + ApplicationProperties ap = m.getApplicationProperties(); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, RECEIVED); + m3propmap.put(MESSAGE_ID, mp.getMessageId()); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+mp.getMessageId()))); + s.send(m3); + + entry.getValue().acknowledge(m); + } + + } + } + + if(wait) + { + try + { + Thread.sleep(500l); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } + + } + + + + + + + + + + s.close(); + session.close(); + conn.close(); + + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java new file mode 100644 index 0000000000..65d27b21f8 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.commons.cli.Options; + +public class Dump extends Util +{ + private static final String USAGE_STRING = "dump [options]
\n\nOptions:"; + + + protected Dump(String[] args) + { + super(args); + } + + public static void main(String[] args) + { + new Dump(args).run(); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasLinkNameOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasSizeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasBlockOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasStdInOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasTxnOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasModeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasCountOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void printUsage(Options options) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, 10); + + Message message = new Message("dump me"); + message.setDeliveryTag(new Binary("dump".getBytes())); + + s.send(message); + + s.close(); + session.close(); + conn.close(); + + } catch (Connection.ConnectionException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java new file mode 100644 index 0000000000..4d98655ad2 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; + +public class Filereceiver extends Util +{ + private static final String USAGE_STRING = "filereceiver [options]
\n\nOptions:"; + + protected Filereceiver(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + final File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + Map unsettled = new HashMap(); + final Map unsettledFileNames = new HashMap(); + + Accepted accepted = new Accepted(); + + for(String fileName : unsettledFiles) + { + File theFile = new File(tmpDirectory, fileName); + if(theFile.isFile()) + { + if(fileName.startsWith("~") && fileName.endsWith("~")) + { + theFile.delete(); + } + else + { + int splitPoint = fileName.indexOf("."); + String deliveryTagStr = fileName.substring(0,splitPoint); + String actualFileName = fileName.substring(splitPoint+1); + + byte[] bytes = new byte[deliveryTagStr.length()/2]; + + + for(int i = 0; i < bytes.length; i++) + { + char c = deliveryTagStr.charAt(2*i); + char d = deliveryTagStr.charAt(1+(2*i)); + + bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) + | (d <= '9' ? d - '0' : d - 'W')); + + } + Binary deliveryTag = new Binary(bytes); + unsettled.put(deliveryTag, accepted); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + } + + Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map remoteUnsettled = r.getRemoteUnsettled(); + + for(Map.Entry entry : unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + + File tmpFile = new File(tmpDirectory, entry.getValue()); + final File dest = new File(directory, + entry.getValue().substring(entry.getValue().indexOf(".") + 1)); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + + tmpFile.renameTo(dest); + } + } + + + int credit = 10; + + r.setCredit(UnsignedInteger.valueOf(credit), true); + + + int received = 0; + Message m = null; + do + { + m = isBlock() && received == 0 ? r.receive() : r.receive(10000); + if(m != null) + { + if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) + { + final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); + final File unsettledFile = new File(tmpDirectory, + tmpFileName); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + int splitPoint = tmpFileName.indexOf("."); + + String fileName = tmpFileName.substring(splitPoint+1); + + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + unsettledFileNames.remove(deliveryTag); + } + }); + } + else + { + received++; + List
sections = m.getPayload(); + Binary deliveryTag = m.getDeliveryTag(); + StringBuilder tagNameBuilder = new StringBuilder(); + + ByteBuffer dtbuf = deliveryTag.asByteBuffer(); + while(dtbuf.hasRemaining()) + { + tagNameBuilder.append(String.format("%02x", dtbuf.get())); + } + + + ApplicationProperties properties = null; + List data = new ArrayList(); + int totalSize = 0; + for(Section section : sections) + { + if(section instanceof ApplicationProperties) + { + properties = (ApplicationProperties) section; + } + else if(section instanceof AmqpValue) + { + AmqpValue value = (AmqpValue) section; + if(value.getValue() instanceof Binary) + { + Binary binary = (Binary) value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + else + { + // TODO exception + } + } + else if(section instanceof Data) + { + Data value = (Data) section; + Binary binary = value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + } + if(properties != null) + { + final String fileName = (String) properties.getValue().get("filename"); + byte[] fileData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(fileData); + int offset = 0; + for(Binary bin : data) + { + buf.put(bin.asByteBuffer()); + } + File outputFile = new File(tmpDirectory, "~"+fileName+"~"); + if(outputFile.exists()) + { + outputFile.delete(); + } + FileOutputStream fos = new FileOutputStream(outputFile); + fos.write(fileData); + fos.flush(); + fos.close(); + + final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + + fileName); + outputFile.renameTo(unsettledFile); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + + } + }); + + } + } + } + } + while(m != null); + + + r.close(); + } + else + { + System.err.println("No such directory: " + directoryName); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (FileNotFoundException e) + { + e.printStackTrace(); //TODO. + } + catch (IOException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + public static void main(String[] args) + { + new Filereceiver(args).run(); + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java new file mode 100644 index 0000000000..46e6ba537f --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -0,0 +1,296 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +public class Filesender extends Util +{ + private static final String USAGE_STRING = "filesender [options]
\n\nOptions:"; + + protected Filesender(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + Connection conn = newConnection(); + + Session session = conn.createSession(); + + File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + + + Map unsettled = new HashMap(); + Map unsettledFileNames = new HashMap(); + for(String fileName : unsettledFiles) + { + File aFile = new File(tmpDirectory, fileName); + if(aFile.canRead() && aFile.canWrite()) + { + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + unsettled.put(deliveryTag, null); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + + Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map remoteUnsettled = s.getRemoteUnsettled(); + + for(Map.Entry entry: unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); + } + } + + if(remoteUnsettled != null) + { + for(Map.Entry entry : remoteUnsettled.entrySet()) + { + if(entry.getValue() instanceof Accepted) + { + final String fileName = unsettledFileNames.get(entry.getKey()); + if(fileName != null) + { + + Message resumed = new Message(); + resumed.setDeliveryTag(entry.getKey()); + resumed.setDeliveryState(entry.getValue()); + resumed.setResume(Boolean.TRUE); + resumed.setSettled(Boolean.TRUE); + + + + final File unsettledFile = new File(tmpDirectory, fileName); + unsettledFile.delete(); + + s.send(resumed); + + } + + } + else if(entry.getValue() instanceof Received || entry.getValue() == null) + { + final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); + Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); + resumed.setResume(Boolean.TRUE); + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + s.send(resumed, action); + + } + } + } + + + + String[] files = directory.list(); + + for(String fileName : files) + { + final File file = new File(directory, fileName); + + if(file.canRead() && file.canWrite() && !file.isDirectory()) + { + Message message = createMessageFromFile(md5, fileName, file); + + final File unsettledFile = new File(tmpDirectory, fileName); + + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + + file.renameTo(unsettledFile); + + s.send(message, action); + } + } + + s.close(); + } + else + { + System.err.println("No such directory: " + directory); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); + } catch (FileNotFoundException e) + { + e.printStackTrace(); + } catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (NoSuchAlgorithmException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } + + private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException + { + FileInputStream fis = new FileInputStream(file); + byte[] data = new byte[(int) file.length()]; + + int read = fis.read(data); + + fis.close(); + + Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); + Section amqpValue = new Data(new Binary(data)); + Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + message.setDeliveryTag(deliveryTag); + md5.reset(); + return message; + } + + public static void main(String[] args) + { + new Filesender(args).run(); + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java new file mode 100644 index 0000000000..0da9dc3fb7 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.commons.cli.*; +import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; + +import java.util.Collections; + +public class Receive extends Util +{ + private static final String USAGE_STRING = "receive [options]
\n\nOptions:"; + private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); + private UnsignedLong _lastCorrelationId; + + public static void main(String[] args) + { + new Receive(args).run(); + } + + + public Receive(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasFilterOption() + { + return true; + } + + protected void run() + { + + try + { + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + + + Session session = conn.createSession(); + + Filter filter = null; + if(getFilter() != null) + { + String[] filterParts = getFilter().split("=",2); + if("exact-subject".equals(filterParts[0])) + { + filter = new ExactSubjectFilter(filterParts[1]); + } + else if("matching-subject".equals(filterParts[0])) + { + filter = new MatchingSubjectFilter(filterParts[1]); + } + else + { + System.err.println("Unknown filter type: " + filterParts[0]); + } + } + + Receiver r = + filter == null + ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) + : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); + Transaction txn = null; + + int credit = 0; + int receivedCount = 0; + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + + Transaction txn2 = null; + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + txn2 = session.createSessionLocalTransaction(); + } + + for(int i = 0; i < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + { + r.setCredit(UnsignedInteger.valueOf(credit), false); + } + if(!isBlock()) + r.drain(); + } + + Message m = isBlock() ? r.receive() : r.receive(1000L); + credit--; + if(m==null) + { + break; + } + + + + r.acknowledge(m.getDeliveryTag(),txn); + + receivedCount++; + + System.out.println("Received Message : " + m.getPayload()); + } + + if(useTran()) + { + txn.commit(); + } + } + else + { + // TODO + } + } + else + { + // TODO + } + r.close(); + session.close(); + conn.close(); + System.out.println("Total Messages Received: " + receivedCount); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java new file mode 100644 index 0000000000..6e1d15376c --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.Arrays; + +public class Request extends Util +{ + private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; + + public static void main(String[] args) + { + new Request(args).run(); + } + + public Request(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public void run() + { + + try + { + + + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + Connection conn2; + Session session2; + Receiver responseReceiver; + + if(isUseMultipleConnections()) + { + conn2 = newConnection(); + session2 = conn2.createSession(); + responseReceiver = session2.createTemporaryQueueReceiver(); + } + else + { + conn2 = null; + session2 = null; + responseReceiver = session.createTemporaryQueueReceiver(); + } + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + int received = 0; + + if(getArgs().length >= 2) + { + message = getArgs()[1]; + if(message.length() < getMessageSize()) + { + StringBuilder builder = new StringBuilder(getMessageSize()); + builder.append(message); + for(int x = message.length(); x < getMessageSize(); x++) + { + builder.append('.'); + } + message = builder.toString(); + } + + for(int i = 0; i < getCount(); i++) + { + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + properties.setReplyTo(responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { new Header() , properties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + + Message responseMessage = responseReceiver.receive(false); + if(responseMessage != null) + { + responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); + received++; + } + } + } + + if(txn != null) + { + txn.commit(); + } + + + while(received < getCount()) + { + Message responseMessage = responseReceiver.receive(); + responseReceiver.acknowledge(responseMessage.getDeliveryTag()); + received++; + } + + + + + s.close(); + session.close(); + conn.close(); + + if(session2 != null) + { + session2.close(); + conn2.close(); + } + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java new file mode 100644 index 0000000000..8d9de4893f --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.*; + +public class Respond extends Util +{ + private static final String USAGE_STRING = "respond [options]
\n\nOptions:"; + private Connection _conn; + private Session _session; + private Receiver _receiver; + private Transaction _txn; + private Map _senders; + private UnsignedLong _responseMsgId = UnsignedLong.ZERO; + private Connection _conn2; + private Session _session2; + + public Respond(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return true; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public static void main(String[] args) + { + new Respond(args).run(); + } + + public void run() + { + try + { + + _senders = new HashMap(); + + final String queue = getArgs()[0]; + + String message = ""; + + _conn = newConnection(); + + + + if(isUseMultipleConnections()) + { + _conn2 = newConnection(); + _session2 = _conn2.createSession(); + } + + + _session = _conn.createSession(); + + + _receiver = _session.createReceiver(queue, getMode()); + _txn = null; + + int credit = 0; + int receivedCount = 0; + _responseMsgId = UnsignedLong.ZERO; + + Random random = null; + int batch = 0; + List txnMessages = null; + if(useTran()) + { + if(getRollbackRatio() != 0) + { + random = new Random(); + } + batch = getBatchSize(); + _txn = _session.createSessionLocalTransaction(); + txnMessages = new ArrayList(batch); + } + + + for(int i = 0; receivedCount < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + _receiver.setCredit(UnsignedInteger.valueOf(credit), false); + + if(!isBlock()) + _receiver.drain(); + } + + Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); + credit--; + if(m==null) + { + if(useTran() && batch != getBatchSize()) + { + _txn.commit(); + } + break; + } + + System.out.println("Received Message: " + m.getPayload()); + + respond(m); + + + + if(useTran()) + { + + txnMessages.add(m); + + if(--batch == 0) + { + + if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) + { + _txn.commit(); + txnMessages.clear(); + receivedCount += getBatchSize(); + } + else + { + System.out.println("Random Rollback"); + _txn.rollback(); + double result; + do + { + _txn = _session.createSessionLocalTransaction(); + + for(Message msg : txnMessages) + { + respond(msg); + } + + result = random.nextDouble(); + if(result sections = m.getPayload(); + String replyTo = null; + Object correlationId = null; + for(Section section : sections) + { + if(section instanceof Properties) + { + replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); + correlationId = ((Properties) section).getMessageId(); + break; + } + } + + if(replyTo != null) + { + Sender s = _senders.get(replyTo); + if(s == null) + { + s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); + _senders.put(replyTo, s); + } + + List
replySections = new ArrayList
(sections); + + ListIterator
sectionIterator = replySections.listIterator(); + + while(sectionIterator.hasNext()) + { + Section section = sectionIterator.next(); + if(section instanceof Properties) + { + Properties newProps = new Properties(); + newProps.setTo(replyTo); + newProps.setCorrelationId(correlationId); + newProps.setMessageId(_responseMsgId); + _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); + sectionIterator.set(newProps); + } + } + + Message replyMessage = new Message(replySections); + System.out.println("Sent Message: " + replySections); + s.send(replyMessage, _txn); + + } + _receiver.acknowledge(m.getDeliveryTag(), _txn); + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java new file mode 100644 index 0000000000..6f6575e083 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.util.Arrays; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +public class Send extends Util +{ + private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; + private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; + + + public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException + { + new Send(args).run(); + } + + + public Send(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasSubjectOption() + { + return true; + } + + public void run() + { + + final String queue = getArgs()[0]; + + String message = ""; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + if(getArgs().length == 2) + { + message = getArgs()[1]; + } + for(int i = 0; i < getCount(); i++) + { + + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + if(getSubject() != null) + { + properties.setSubject(getSubject()); + } + Section bodySection; + byte[] bytes = (message + " " + i).getBytes(); + if(bytes.length < getMessageSize()) + { + byte[] origBytes = bytes; + bytes = new byte[getMessageSize()]; + System.arraycopy(origBytes,0,bytes,0,origBytes.length); + for(int x = origBytes.length; x < bytes.length; x++) + { + bytes[x] = (byte) '.'; + } + bodySection = new Data(new Binary(bytes)); + } + else + { + bodySection = new AmqpValue(message + " " + i); + } + + Section[] sections = {properties, bodySection}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + } + } + else + { + for(int i = 1; i < getArgs().length; i++) + { + s.send(new Message(getArgs()[i]), txn); + } + + } + } + else + { + LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); + + + try + { + while((message = buf.readLine()) != null) + { + s.send(new Message(message), txn); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + } + + if(txn != null) + { + txn.commit(); + } + + s.close(); + + session.close(); + conn.close(); + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java new file mode 100644 index 0000000000..6fe2a6d510 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java @@ -0,0 +1,529 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.commons.cli.*; + +import java.util.logging.*; + +public abstract class Util +{ + + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + private String _host; + private String _username; + private String _password; + private int _port; + private int _count; + private boolean _useStdIn; + private boolean _useTran; + private String[] _args; + private AcknowledgeMode _mode; + private boolean _block; + private int _frameSize; + private int _messageSize; + private String _responseQueue; + private int _batchSize; + private double _rollbackRatio; + private String _linkName; + private String _containerName; + private boolean _durableLink; + private boolean _useMultipleConnections; + private int _windowSize = 100; + private String _subject; + private String _filter; + private String _remoteHost; + private boolean _useSSL; + + protected Util(String[] args) + { + CommandLineParser cmdLineParse = new PosixParser(); + + Options options = new Options(); + options.addOption("h","help",false,"show this help message and exit"); + options.addOption(OptionBuilder.withLongOpt("host") + .withDescription( "host to connect to (default 0.0.0.0)" ) + .hasArg(true) + .withArgName("HOST") + .create('H')); + options.addOption(OptionBuilder.withLongOpt("username") + .withDescription( "username to use for authentication" ) + .hasArg(true) + .withArgName("USERNAME") + .create('u')); + options.addOption(OptionBuilder.withLongOpt("password") + .withDescription( "password to use for authentication" ) + .hasArg(true) + .withArgName("PASSWORD") + .create('w')); + options.addOption(OptionBuilder.withLongOpt("port") + .withDescription( "port to connect to (default 5672)" ) + .hasArg(true) + .withArgName("PORT") + .create('p')); + options.addOption(OptionBuilder.withLongOpt("frame-size") + .withDescription( "specify the maximum frame size" ) + .hasArg(true) + .withArgName("FRAME_SIZE") + .create('f')); + options.addOption(OptionBuilder.withLongOpt("container-name") + .withDescription( "Container name" ) + .hasArg(true) + .withArgName("CONTAINER_NAME") + .create('C')); + + options.addOption(OptionBuilder.withLongOpt("ssl") + .withDescription("Use SSL") + .create('S')); + + options.addOption(OptionBuilder.withLongOpt("remote-hostname") + .withDescription( "hostname to supply in the open frame" ) + .hasArg(true) + .withArgName("HOST") + .create('O')); + + if(hasBlockOption()) + options.addOption(OptionBuilder.withLongOpt("block") + .withDescription("block until messages arrive") + .create('b')); + + if(hasCountOption()) + options.addOption(OptionBuilder.withLongOpt("count") + .withDescription( "number of messages to send (default 1)" ) + .hasArg(true) + .withArgName("COUNT") + .create('c')); + if(hasModeOption()) + options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") + .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) + .hasArg(true) + .withArgName("MODE") + .create('k')); + + if(hasSubjectOption()) + options.addOption(OptionBuilder.withLongOpt("subject") + .withDescription( "subject message property" ) + .hasArg(true) + .withArgName("SUBJECT") + .create('s')); + + + if(hasSingleLinkPerConnectionMode()) + options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") + .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") + .hasArg(false) + .create('Z')); + + if(hasFilterOption()) + options.addOption(OptionBuilder.withLongOpt("filter") + .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") + .hasArg(true) + .withArgName("=") + .create('F')); + + + if(hasTxnOption()) + { + options.addOption("x","txn",false,"use transactions"); + options.addOption(OptionBuilder.withLongOpt("batch-size") + .withDescription( "transaction batch size (default: 1)" ) + .hasArg(true) + .withArgName("BATCH-SIZE") + .create('B')); + options.addOption(OptionBuilder.withLongOpt("rollback-ratio") + .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) + .hasArg(true) + .withArgName("RATIO") + .create('R')); + } + + if(hasLinkDurableOption()) + { + options.addOption("d","durable-link",false,"use a durable link"); + } + + if(hasStdInOption()) + options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); + + options.addOption(OptionBuilder.withLongOpt("trace") + .withDescription("trace logging specified categories: RAW, FRM") + .hasArg(true) + .withArgName("TRACE") + .create('t')); + if(hasSizeOption()) + options.addOption(OptionBuilder.withLongOpt("message-size") + .withDescription( "size to pad outgoing messages to" ) + .hasArg(true) + .withArgName("SIZE") + .create('z')); + + if(hasResponseQueueOption()) + options.addOption(OptionBuilder.withLongOpt("response-queue") + .withDescription( "response queue to reply to" ) + .hasArg(true) + .withArgName("RESPONSE_QUEUE") + .create('r')); + + if(hasLinkNameOption()) + { + options.addOption(OptionBuilder.withLongOpt("link") + .withDescription( "link name" ) + .hasArg(true) + .withArgName("LINK") + .create('l')); + } + + if(hasWindowSizeOption()) + { + options.addOption(OptionBuilder.withLongOpt("window-size") + .withDescription("credit window size") + .hasArg(true) + .withArgName("WINDOW-SIZE") + .create('W')); + } + + CommandLine cmdLine = null; + try + { + cmdLine = cmdLineParse.parse(options, args); + + } + catch (ParseException e) + { + printUsage(options); + System.exit(-1); + } + + if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) + { + printUsage(options); + System.exit(0); + } + _host = cmdLine.getOptionValue('H',"0.0.0.0"); + _remoteHost = cmdLine.getOptionValue('O',null); + String portStr = cmdLine.getOptionValue('p',"5672"); + String countStr = cmdLine.getOptionValue('c',"1"); + + _useSSL = cmdLine.hasOption('S'); + + if(hasWindowSizeOption()) + { + String windowSizeStr = cmdLine.getOptionValue('W',"100"); + _windowSize = Integer.parseInt(windowSizeStr); + } + + if(hasSubjectOption()) + { + _subject = cmdLine.getOptionValue('s'); + } + + if(cmdLine.hasOption('u')) + { + _username = cmdLine.getOptionValue('u'); + } + + if(cmdLine.hasOption('w')) + { + _password = cmdLine.getOptionValue('w'); + } + + if(cmdLine.hasOption('F')) + { + _filter = cmdLine.getOptionValue('F'); + } + + _port = Integer.parseInt(portStr); + + _containerName = cmdLine.getOptionValue('C'); + + if(hasBlockOption()) + _block = cmdLine.hasOption('b'); + + if(hasLinkNameOption()) + _linkName = cmdLine.getOptionValue('l'); + + + if(hasLinkDurableOption()) + _durableLink = cmdLine.hasOption('d'); + + if(hasCountOption()) + _count = Integer.parseInt(countStr); + + if(hasStdInOption()) + _useStdIn = cmdLine.hasOption('i'); + + if(hasSingleLinkPerConnectionMode()) + _useMultipleConnections = cmdLine.hasOption('Z'); + + if(hasTxnOption()) + { + _useTran = cmdLine.hasOption('x'); + _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); + _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); + } + + if(hasModeOption()) + { + _mode = AcknowledgeMode.ALO; + + if(cmdLine.hasOption('k')) + { + _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); + } + } + + if(hasResponseQueueOption()) + { + _responseQueue = cmdLine.getOptionValue('r'); + } + + _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); + + if(hasSizeOption()) + { + _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); + } + + String categoriesList = cmdLine.getOptionValue('t'); + String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); + for(String cat : categories) + { + if(cat.equalsIgnoreCase("FRM")) + { + FRAME_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : FRAME_LOGGER.getHandlers()) + { + FRAME_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + FRAME_LOGGER.addHandler(handler); + } + else if (cat.equalsIgnoreCase("RAW")) + { + RAW_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : RAW_LOGGER.getHandlers()) + { + RAW_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + RAW_LOGGER.addHandler(handler); + } + } + + + _args = cmdLine.getArgs(); + + } + + protected boolean hasFilterOption() + { + return false; + } + + protected boolean hasSubjectOption() + { + return false; + } + + protected boolean hasWindowSizeOption() + { + return false; + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected abstract boolean hasLinkDurableOption(); + + protected abstract boolean hasLinkNameOption(); + + protected abstract boolean hasResponseQueueOption(); + + protected abstract boolean hasSizeOption(); + + protected abstract boolean hasBlockOption(); + + protected abstract boolean hasStdInOption(); + + protected abstract boolean hasTxnOption(); + + protected abstract boolean hasModeOption(); + + protected abstract boolean hasCountOption(); + + public String getHost() + { + return _host; + } + + public String getUsername() + { + return _username; + } + + public String getPassword() + { + return _password; + } + + public int getPort() + { + return _port; + } + + public int getCount() + { + return _count; + } + + public boolean useStdIn() + { + return _useStdIn; + } + + public boolean useTran() + { + return _useTran; + } + + public AcknowledgeMode getMode() + { + return _mode; + } + + public boolean isBlock() + { + return _block; + } + + public String[] getArgs() + { + return _args; + } + + public int getMessageSize() + { + return _messageSize; + } + + public String getResponseQueue() + { + return _responseQueue; + } + + public int getBatchSize() + { + return _batchSize; + } + + public double getRollbackRatio() + { + return _rollbackRatio; + } + + public String getLinkName() + { + return _linkName; + } + + public boolean isDurableLink() + { + return _durableLink; + } + + public boolean isUseMultipleConnections() + { + return _useMultipleConnections; + } + + public void setUseMultipleConnections(boolean useMultipleConnections) + { + _useMultipleConnections = useMultipleConnections; + } + + public String getSubject() + { + return _subject; + } + + public void setSubject(String subject) + { + _subject = subject; + } + + protected abstract void printUsage(final Options options); + + protected abstract void run(); + + + public Connection newConnection() throws Connection.ConnectionException + { + Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); + return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, + _remoteHost == null ? getHost() : _remoteHost, _useSSL) + : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, + container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); + } + + public String getContainerName() + { + return _containerName; + } + + public int getWindowSize() + { + return _windowSize; + } + + public void setWindowSize(int windowSize) + { + _windowSize = windowSize; + } + + public String getFilter() + { + return _filter; + } +} diff --git a/java/amqp-1-0-client/resources/LICENSE b/java/amqp-1-0-client/resources/LICENSE new file mode 100644 index 0000000000..de4b130f35 --- /dev/null +++ b/java/amqp-1-0-client/resources/LICENSE @@ -0,0 +1,204 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + diff --git a/java/amqp-1-0-client/resources/NOTICE b/java/amqp-1-0-client/resources/NOTICE new file mode 100644 index 0000000000..8d1c3f3122 --- /dev/null +++ b/java/amqp-1-0-client/resources/NOTICE @@ -0,0 +1,5 @@ +Apache Qpid +Copyright 2006-2012 Apache Software Foundation +This product includes software developed at +Apache Software Foundation (http://www.apache.org/) + diff --git a/java/amqp-1-0-client/resources/README.txt b/java/amqp-1-0-client/resources/README.txt new file mode 100644 index 0000000000..35d25050fe --- /dev/null +++ b/java/amqp-1-0-client/resources/README.txt @@ -0,0 +1,7 @@ + +Documentation +-------------- +All of our user documentation can be accessed at: + +http://qpid.apache.org/documentation.html + diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java deleted file mode 100644 index 3bb26744c4..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.lang.reflect.InvocationTargetException; - -public class Command -{ - public static void main(String[] args) throws - ClassNotFoundException, - NoSuchMethodException, - InvocationTargetException, - IllegalAccessException, - InstantiationException - { - String name = args[0]; - String[] cmdArgs = new String[args.length-1]; - System.arraycopy(args,1,cmdArgs,0,args.length-1); - name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); - Class clazz = (Class) Class.forName(name); - Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); - util.run(); - - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index e3d56fae09..e501662dbb 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,15 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.net.ssl.SSLSocketFactory; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.logging.Level; -import java.util.logging.Logger; - public class Connection { private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); @@ -224,7 +222,6 @@ public class Connection } - //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry()); ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); Thread outputThread = new Thread(outputHandler); outputThread.setDaemon(true); @@ -236,8 +233,6 @@ public class Connection final ConnectionHandler handler = new ConnectionHandler(_conn); final InputStream inputStream = s.getInputStream(); - //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn)); - Thread inputThread = new Thread(new Runnable() { @@ -246,7 +241,6 @@ public class Connection try { doRead(handler, inputStream); -// doRead(transport, inputStream); } finally { @@ -268,85 +262,6 @@ public class Connection inputThread.setDaemon(true); inputThread.start(); -/* - Thread outputThread = new Thread(new Runnable() - { - - private int _lastWrite; - - public void run() - { - try - { -// doRead(handler, inputStream); - final Object lock = new Object(); - transport.setOutputStateChangeListener(new StateChangeListener() - { - - public void onStateChange(final boolean active) - { - synchronized (lock) - { - lock.notifyAll(); - } - } - }); - - synchronized(lock) - { - while(transport.isOpenForOutput()) - { - _lastWrite = 0; - transport.getNextBytes(new BytesProcessor() - { - - public void processBytes(final ByteBuffer buf) - { - _lastWrite = buf.remaining(); - try - { - outputStream.write(buf.array(), - buf.arrayOffset() + buf.position(), - buf.limit() - buf.position()); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - }); - if(_lastWrite == 0 && transport.isOpenForOutput()) - { - try - { - lock.wait(1000); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - s.close(); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); -*/ - _conn.open(); } @@ -394,7 +309,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } @@ -419,7 +334,7 @@ public class Connection { int read; boolean done = false; - while(!done && (read = inputStream.read(buf)) != -1) + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) { ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); Binary b = new Binary(buf,0,read); @@ -428,12 +343,6 @@ public class Connection { RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); } - /*System.err.println(b); - System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone()); - if(handler.isDone()) - { - System.err.println(handler.getClass().getName() + "IS DONE!"); - } */ while(bbuf.hasRemaining() && !handler.isDone()) { handler.parse(bbuf); @@ -444,7 +353,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java deleted file mode 100644 index b58ce6bfe5..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class Demo extends Util -{ - private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; - private static final String OPCODE = "opcode"; - private static final String ACTION = "action"; - private static final String MESSAGE_ID = "message-id"; - private static final String VENDOR = "vendor"; - private static final String LOG = "log"; - private static final String RECEIVED = "received"; - private static final String TEST = "test"; - private static final String APACHE = "apache"; - private static final String SENT = "sent"; - private static final String LINK_REF = "link-ref"; - private static final String HOST = "host"; - private static final String PORT = "port"; - private static final String SASL_USER = "sasl-user"; - private static final String SASL_PASSWORD = "sasl-password"; - private static final String ROLE = "role"; - private static final String ADDRESS = "address"; - private static final String SENDER = "sender"; - private static final String SEND_MESSAGE = "send-message"; - private static final String ANNOUNCE = "announce"; - private static final String MESSAGE_VENDOR = "message-vendor"; - private static final String CREATE_LINK = "create-link"; - - public static void main(String[] args) - { - new Demo(args).run(); - } - - public Demo(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected boolean hasWindowSizeOption() - { - return false; - } - - public void run() - { - - try - { - - final String vendor = getArgs()[0]; - final String queue = "control"; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - - Receiver responseReceiver; - - responseReceiver = session.createTemporaryQueueReceiver(); - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - - Properties properties = new Properties(); - properties.setMessageId(java.util.UUID.randomUUID()); - properties.setReplyTo(responseReceiver.getAddress()); - - HashMap appPropMap = new HashMap(); - ApplicationProperties appProperties = new ApplicationProperties(appPropMap); - - appPropMap.put(OPCODE, ANNOUNCE); - appPropMap.put(VENDOR, vendor); - appPropMap.put(ADDRESS,responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { properties, appProperties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1); - - Map sendingLinks = new HashMap(); - Map receivingLinks = new HashMap(); - - - boolean done = false; - - while(!done) - { - boolean wait = true; - Message m = responseReceiver.receive(false); - if(m != null) - { - List
payload = m.getPayload(); - wait = false; - ApplicationProperties props = m.getApplicationProperties(); - Map map = props.getValue(); - String op = (String) map.get(OPCODE); - if("reset".equals(op)) - { - for(Sender sender : sendingLinks.values()) - { - try - { - sender.close(); - Session session1 = sender.getSession(); - session1.close(); - session1.getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - for(Receiver receiver : receivingLinks.values()) - { - try - { - receiver.close(); - receiver.getSession().close(); - receiver.getSession().getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - sendingLinks.clear(); - receivingLinks.clear(); - } - else if(CREATE_LINK.equals(op)) - { - Object linkRef = map.get(LINK_REF); - String host = (String) map.get(HOST); - Object o = map.get(PORT); - int port = Integer.parseInt(String.valueOf(o)); - String user = (String) map.get(SASL_USER); - String password = (String) map.get(SASL_PASSWORD); - String role = (String) map.get(ROLE); - String address = (String) map.get(ADDRESS); - System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); - try{ - - - Connection conn2 = new Connection(host, port, user, password, host); - Session session2 = conn2.createSession(); - if(sendingLinks.containsKey(linkRef)) - { - try - { - sendingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(receivingLinks.containsKey(linkRef)) - { - try - { - receivingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(SENDER.equals(role)) - { - - System.err.println("%%% Creating sender (" + linkRef + ")"); - Sender sender = session2.createSender(address); - sendingLinks.put(linkRef, sender); - } - else - { - - System.err.println("%%% Creating receiver (" + linkRef + ")"); - Receiver receiver2 = session2.createReceiver(address); - receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - receivingLinks.put(linkRef, receiver2); - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - else if(SEND_MESSAGE.equals(op)) - { - Sender sender = sendingLinks.get(map.get(LINK_REF)); - Properties m2props = new Properties(); - Object messageId = map.get(MESSAGE_ID); - m2props.setMessageId(messageId); - Map m2propmap = new HashMap(); - m2propmap.put(OPCODE, TEST); - m2propmap.put(VENDOR, vendor); - ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); - Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - sender.send(m2); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, SENT); - m3propmap.put(MESSAGE_ID, messageId); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, vendor); - - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+messageId))); - s.send(m3); - - } - - responseReceiver.acknowledge(m); - } - else - { - for(Map.Entry entry : receivingLinks.entrySet()) - { - m = entry.getValue().receive(false); - if(m != null) - { - wait = false; - - System.err.println("%%% Received message from " + entry.getKey()); - - Properties mp = m.getProperties(); - ApplicationProperties ap = m.getApplicationProperties(); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, RECEIVED); - m3propmap.put(MESSAGE_ID, mp.getMessageId()); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+mp.getMessageId()))); - s.send(m3); - - entry.getValue().acknowledge(m); - } - - } - } - - if(wait) - { - try - { - Thread.sleep(500l); - } - catch (InterruptedException e) - { - e.printStackTrace(); //TODO. - } - } - - } - - - - - - - - - - s.close(); - session.close(); - conn.close(); - - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java deleted file mode 100644 index 65d27b21f8..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.commons.cli.Options; - -public class Dump extends Util -{ - private static final String USAGE_STRING = "dump [options]
\n\nOptions:"; - - - protected Dump(String[] args) - { - super(args); - } - - public static void main(String[] args) - { - new Dump(args).run(); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasLinkNameOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasSizeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasBlockOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasStdInOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasTxnOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasModeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasCountOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void printUsage(Options options) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, 10); - - Message message = new Message("dump me"); - message.setDeliveryTag(new Binary("dump".getBytes())); - - s.send(message); - - s.close(); - session.close(); - conn.close(); - - } catch (Connection.ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java deleted file mode 100644 index 4d98655ad2..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -public class Filereceiver extends Util -{ - private static final String USAGE_STRING = "filereceiver [options]
\n\nOptions:"; - - protected Filereceiver(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - final File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - Map unsettled = new HashMap(); - final Map unsettledFileNames = new HashMap(); - - Accepted accepted = new Accepted(); - - for(String fileName : unsettledFiles) - { - File theFile = new File(tmpDirectory, fileName); - if(theFile.isFile()) - { - if(fileName.startsWith("~") && fileName.endsWith("~")) - { - theFile.delete(); - } - else - { - int splitPoint = fileName.indexOf("."); - String deliveryTagStr = fileName.substring(0,splitPoint); - String actualFileName = fileName.substring(splitPoint+1); - - byte[] bytes = new byte[deliveryTagStr.length()/2]; - - - for(int i = 0; i < bytes.length; i++) - { - char c = deliveryTagStr.charAt(2*i); - char d = deliveryTagStr.charAt(1+(2*i)); - - bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) - | (d <= '9' ? d - '0' : d - 'W')); - - } - Binary deliveryTag = new Binary(bytes); - unsettled.put(deliveryTag, accepted); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - } - - Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map remoteUnsettled = r.getRemoteUnsettled(); - - for(Map.Entry entry : unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - - File tmpFile = new File(tmpDirectory, entry.getValue()); - final File dest = new File(directory, - entry.getValue().substring(entry.getValue().indexOf(".") + 1)); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - - tmpFile.renameTo(dest); - } - } - - - int credit = 10; - - r.setCredit(UnsignedInteger.valueOf(credit), true); - - - int received = 0; - Message m = null; - do - { - m = isBlock() && received == 0 ? r.receive() : r.receive(10000); - if(m != null) - { - if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) - { - final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); - final File unsettledFile = new File(tmpDirectory, - tmpFileName); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - int splitPoint = tmpFileName.indexOf("."); - - String fileName = tmpFileName.substring(splitPoint+1); - - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - unsettledFileNames.remove(deliveryTag); - } - }); - } - else - { - received++; - List
sections = m.getPayload(); - Binary deliveryTag = m.getDeliveryTag(); - StringBuilder tagNameBuilder = new StringBuilder(); - - ByteBuffer dtbuf = deliveryTag.asByteBuffer(); - while(dtbuf.hasRemaining()) - { - tagNameBuilder.append(String.format("%02x", dtbuf.get())); - } - - - ApplicationProperties properties = null; - List data = new ArrayList(); - int totalSize = 0; - for(Section section : sections) - { - if(section instanceof ApplicationProperties) - { - properties = (ApplicationProperties) section; - } - else if(section instanceof AmqpValue) - { - AmqpValue value = (AmqpValue) section; - if(value.getValue() instanceof Binary) - { - Binary binary = (Binary) value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - else - { - // TODO exception - } - } - else if(section instanceof Data) - { - Data value = (Data) section; - Binary binary = value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - } - if(properties != null) - { - final String fileName = (String) properties.getValue().get("filename"); - byte[] fileData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(fileData); - int offset = 0; - for(Binary bin : data) - { - buf.put(bin.asByteBuffer()); - } - File outputFile = new File(tmpDirectory, "~"+fileName+"~"); - if(outputFile.exists()) - { - outputFile.delete(); - } - FileOutputStream fos = new FileOutputStream(outputFile); - fos.write(fileData); - fos.flush(); - fos.close(); - - final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + - fileName); - outputFile.renameTo(unsettledFile); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - - } - }); - - } - } - } - } - while(m != null); - - - r.close(); - } - else - { - System.err.println("No such directory: " + directoryName); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (FileNotFoundException e) - { - e.printStackTrace(); //TODO. - } - catch (IOException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - public static void main(String[] args) - { - new Filereceiver(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java deleted file mode 100644 index 46e6ba537f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -public class Filesender extends Util -{ - private static final String USAGE_STRING = "filesender [options]
\n\nOptions:"; - - protected Filesender(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - MessageDigest md5 = MessageDigest.getInstance("MD5"); - Connection conn = newConnection(); - - Session session = conn.createSession(); - - File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - - - Map unsettled = new HashMap(); - Map unsettledFileNames = new HashMap(); - for(String fileName : unsettledFiles) - { - File aFile = new File(tmpDirectory, fileName); - if(aFile.canRead() && aFile.canWrite()) - { - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - unsettled.put(deliveryTag, null); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - - Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map remoteUnsettled = s.getRemoteUnsettled(); - - for(Map.Entry entry: unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); - } - } - - if(remoteUnsettled != null) - { - for(Map.Entry entry : remoteUnsettled.entrySet()) - { - if(entry.getValue() instanceof Accepted) - { - final String fileName = unsettledFileNames.get(entry.getKey()); - if(fileName != null) - { - - Message resumed = new Message(); - resumed.setDeliveryTag(entry.getKey()); - resumed.setDeliveryState(entry.getValue()); - resumed.setResume(Boolean.TRUE); - resumed.setSettled(Boolean.TRUE); - - - - final File unsettledFile = new File(tmpDirectory, fileName); - unsettledFile.delete(); - - s.send(resumed); - - } - - } - else if(entry.getValue() instanceof Received || entry.getValue() == null) - { - final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); - Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); - resumed.setResume(Boolean.TRUE); - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - s.send(resumed, action); - - } - } - } - - - - String[] files = directory.list(); - - for(String fileName : files) - { - final File file = new File(directory, fileName); - - if(file.canRead() && file.canWrite() && !file.isDirectory()) - { - Message message = createMessageFromFile(md5, fileName, file); - - final File unsettledFile = new File(tmpDirectory, fileName); - - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - - file.renameTo(unsettledFile); - - s.send(message, action); - } - } - - s.close(); - } - else - { - System.err.println("No such directory: " + directory); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - - } - - private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException - { - FileInputStream fis = new FileInputStream(file); - byte[] data = new byte[(int) file.length()]; - - int read = fis.read(data); - - fis.close(); - - Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); - Section amqpValue = new Data(new Binary(data)); - Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - message.setDeliveryTag(deliveryTag); - md5.reset(); - return message; - } - - public static void main(String[] args) - { - new Filesender(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java deleted file mode 100644 index 07ae54b54f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -public class ReadBytes -{ - - public static void main(String[] args) throws IOException, AmqpErrorException - { - - if(args.length == 0) - { - readBytes(System.in); - } - else - { - for(String fileName : args) - { - System.out.println("=========================== " + fileName + " ==========================="); - final FileInputStream fis = new FileInputStream(fileName); - readBytes(fis); - fis.close(); - } - } - - } - - private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException - { - byte[] bytes = new byte[4096]; - - ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance()); - - int count; - - while((count = inputStream.read(bytes))!=-1) - { - ByteBuffer buf = ByteBuffer.wrap(bytes); - buf.limit(count); - while(buf.hasRemaining()) - { - - final Object value = valueHandler.parse(buf); - System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n"); - - } - } - - } - - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java deleted file mode 100644 index 0da9dc3fb7..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.commons.cli.*; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; - -import java.util.Collections; - -public class Receive extends Util -{ - private static final String USAGE_STRING = "receive [options]
\n\nOptions:"; - private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); - private UnsignedLong _lastCorrelationId; - - public static void main(String[] args) - { - new Receive(args).run(); - } - - - public Receive(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasFilterOption() - { - return true; - } - - protected void run() - { - - try - { - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - - - Session session = conn.createSession(); - - Filter filter = null; - if(getFilter() != null) - { - String[] filterParts = getFilter().split("=",2); - if("exact-subject".equals(filterParts[0])) - { - filter = new ExactSubjectFilter(filterParts[1]); - } - else if("matching-subject".equals(filterParts[0])) - { - filter = new MatchingSubjectFilter(filterParts[1]); - } - else - { - System.err.println("Unknown filter type: " + filterParts[0]); - } - } - - Receiver r = - filter == null - ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) - : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); - Transaction txn = null; - - int credit = 0; - int receivedCount = 0; - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - - Transaction txn2 = null; - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - txn2 = session.createSessionLocalTransaction(); - } - - for(int i = 0; i < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - { - r.setCredit(UnsignedInteger.valueOf(credit), false); - } - if(!isBlock()) - r.drain(); - } - - Message m = isBlock() ? r.receive() : r.receive(1000L); - credit--; - if(m==null) - { - break; - } - - - - r.acknowledge(m.getDeliveryTag(),txn); - - receivedCount++; - - System.out.println("Received Message : " + m.getPayload()); - } - - if(useTran()) - { - txn.commit(); - } - } - else - { - // TODO - } - } - else - { - // TODO - } - r.close(); - session.close(); - conn.close(); - System.out.println("Total Messages Received: " + receivedCount); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad390fd498..8b792db1f1 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler } if(hasMore) { - xfr = receiveFromPrefetch(0L); + xfr = receiveFromPrefetch(-1l); if(xfr== null) { // TODO - this is wrong!!!! @@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler _endpoint.drain(); } + /** + * Waits for the receiver to drain or a message to be available to be received. + * @return true if the receiver has been drained. + */ + public boolean drainWait() + { + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + try + { + while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + } + } + return _prefetchQueue.peek()==null && _endpoint.isDrained(); + } + + /** + * Clears the receiver drain so that message delivery can resume. + */ + public void clearDrain() + { + _endpoint.clearDrain(); + } + public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) { _endpoint.setLinkCredit(credit); @@ -558,4 +589,4 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver); } -} \ No newline at end of file +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java deleted file mode 100644 index 6e1d15376c..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.Arrays; - -public class Request extends Util -{ - private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; - - public static void main(String[] args) - { - new Request(args).run(); - } - - public Request(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public void run() - { - - try - { - - - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - Connection conn2; - Session session2; - Receiver responseReceiver; - - if(isUseMultipleConnections()) - { - conn2 = newConnection(); - session2 = conn2.createSession(); - responseReceiver = session2.createTemporaryQueueReceiver(); - } - else - { - conn2 = null; - session2 = null; - responseReceiver = session.createTemporaryQueueReceiver(); - } - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - int received = 0; - - if(getArgs().length >= 2) - { - message = getArgs()[1]; - if(message.length() < getMessageSize()) - { - StringBuilder builder = new StringBuilder(getMessageSize()); - builder.append(message); - for(int x = message.length(); x < getMessageSize(); x++) - { - builder.append('.'); - } - message = builder.toString(); - } - - for(int i = 0; i < getCount(); i++) - { - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - properties.setReplyTo(responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { new Header() , properties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - - Message responseMessage = responseReceiver.receive(false); - if(responseMessage != null) - { - responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); - received++; - } - } - } - - if(txn != null) - { - txn.commit(); - } - - - while(received < getCount()) - { - Message responseMessage = responseReceiver.receive(); - responseReceiver.acknowledge(responseMessage.getDeliveryTag()); - received++; - } - - - - - s.close(); - session.close(); - conn.close(); - - if(session2 != null) - { - session2.close(); - conn2.close(); - } - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java deleted file mode 100644 index 8d9de4893f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.*; - -public class Respond extends Util -{ - private static final String USAGE_STRING = "respond [options]
\n\nOptions:"; - private Connection _conn; - private Session _session; - private Receiver _receiver; - private Transaction _txn; - private Map _senders; - private UnsignedLong _responseMsgId = UnsignedLong.ZERO; - private Connection _conn2; - private Session _session2; - - public Respond(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return true; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public static void main(String[] args) - { - new Respond(args).run(); - } - - public void run() - { - try - { - - _senders = new HashMap(); - - final String queue = getArgs()[0]; - - String message = ""; - - _conn = newConnection(); - - - - if(isUseMultipleConnections()) - { - _conn2 = newConnection(); - _session2 = _conn2.createSession(); - } - - - _session = _conn.createSession(); - - - _receiver = _session.createReceiver(queue, getMode()); - _txn = null; - - int credit = 0; - int receivedCount = 0; - _responseMsgId = UnsignedLong.ZERO; - - Random random = null; - int batch = 0; - List txnMessages = null; - if(useTran()) - { - if(getRollbackRatio() != 0) - { - random = new Random(); - } - batch = getBatchSize(); - _txn = _session.createSessionLocalTransaction(); - txnMessages = new ArrayList(batch); - } - - - for(int i = 0; receivedCount < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - _receiver.setCredit(UnsignedInteger.valueOf(credit), false); - - if(!isBlock()) - _receiver.drain(); - } - - Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); - credit--; - if(m==null) - { - if(useTran() && batch != getBatchSize()) - { - _txn.commit(); - } - break; - } - - System.out.println("Received Message: " + m.getPayload()); - - respond(m); - - - - if(useTran()) - { - - txnMessages.add(m); - - if(--batch == 0) - { - - if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - else - { - System.out.println("Random Rollback"); - _txn.rollback(); - double result; - do - { - _txn = _session.createSessionLocalTransaction(); - - for(Message msg : txnMessages) - { - respond(msg); - } - - result = random.nextDouble(); - if(result sections = m.getPayload(); - String replyTo = null; - Object correlationId = null; - for(Section section : sections) - { - if(section instanceof Properties) - { - replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); - correlationId = ((Properties) section).getMessageId(); - break; - } - } - - if(replyTo != null) - { - Sender s = _senders.get(replyTo); - if(s == null) - { - s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); - _senders.put(replyTo, s); - } - - List
replySections = new ArrayList
(sections); - - ListIterator
sectionIterator = replySections.listIterator(); - - while(sectionIterator.hasNext()) - { - Section section = sectionIterator.next(); - if(section instanceof Properties) - { - Properties newProps = new Properties(); - newProps.setTo(replyTo); - newProps.setCorrelationId(correlationId); - newProps.setMessageId(_responseMsgId); - _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); - sectionIterator.set(newProps); - } - } - - Message replyMessage = new Message(replySections); - System.out.println("Sent Message: " + replySections); - s.send(replyMessage, _txn); - - } - _receiver.acknowledge(m.getDeliveryTag(), _txn); - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java deleted file mode 100644 index 6f6575e083..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.util.Arrays; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -public class Send extends Util -{ - private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; - private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; - - - public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException - { - new Send(args).run(); - } - - - public Send(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasSubjectOption() - { - return true; - } - - public void run() - { - - final String queue = getArgs()[0]; - - String message = ""; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - if(getArgs().length == 2) - { - message = getArgs()[1]; - } - for(int i = 0; i < getCount(); i++) - { - - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - if(getSubject() != null) - { - properties.setSubject(getSubject()); - } - Section bodySection; - byte[] bytes = (message + " " + i).getBytes(); - if(bytes.length < getMessageSize()) - { - byte[] origBytes = bytes; - bytes = new byte[getMessageSize()]; - System.arraycopy(origBytes,0,bytes,0,origBytes.length); - for(int x = origBytes.length; x < bytes.length; x++) - { - bytes[x] = (byte) '.'; - } - bodySection = new Data(new Binary(bytes)); - } - else - { - bodySection = new AmqpValue(message + " " + i); - } - - Section[] sections = {properties, bodySection}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - } - } - else - { - for(int i = 1; i < getArgs().length; i++) - { - s.send(new Message(getArgs()[i]), txn); - } - - } - } - else - { - LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); - - - try - { - while((message = buf.readLine()) != null) - { - s.send(new Message(message), txn); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - } - - if(txn != null) - { - txn.commit(); - } - - s.close(); - - session.close(); - conn.close(); - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java deleted file mode 100644 index 6f97ecd810..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.codec.ValueWriter; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.UnsignedShort; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.Footer; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.amqp_1_0.type.transport.Flow; - -import org.apache.qpid.amqp_1_0.type.transport.Transfer; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -public class SendBytes -{ - - public static void main(String[] args) throws - Sender.SenderCreationException, - Sender.SenderClosingException, - Connection.ConnectionException, - IOException, ParseException - { - Transfer xfr = new Transfer(); - Flow fs = new Flow(); - fs.setIncomingWindow(UnsignedInteger.valueOf(1024)); - fs.setDeliveryCount(UnsignedInteger.valueOf(2)); - fs.setLinkCredit(UnsignedInteger.valueOf(18)); - fs.setAvailable(UnsignedInteger.valueOf(0)); - fs.setDrain(false); - - xfr.setHandle(UnsignedInteger.valueOf(0)); - xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes())); - //xfr.setDeliveryTag(new Binary(new byte[] {0})); - xfr.setDeliveryId(UnsignedInteger.valueOf(0)); - xfr.setSettled(true); - - - Header h = new Header(); - Properties p = new Properties(); - p.setTo("queue"); - //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes())); - - Footer f = new Footer(Collections.EMPTY_MAP); - - Section[] sections = new Section[] { h,p,f}; - //Section[] sections = new Section[] { b }; - //Section[] sections = { h,p, b}; -/* - Fragment[] fragments = new Fragment[5]; - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer(); - - SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry); - - int num = 0; - int i = 0; - for(Section s : sections) - { - Fragment frag = new Fragment(); - - frag.setPayload(s.encode(encoder)); - frag.setFirst(true); - frag.setLast(true); - frag.setSectionCode(s.getSectionCode()); - frag.setSectionNumber(UnsignedInteger.valueOf(num++)); - frag.setSectionOffset(UnsignedLong.valueOf(0L)); - fragments[i++] =frag; - } - - xfr.setFragments(fragments); -*/ - - encodeTypes("xfr",xfr); - - final byte[] result; - final Object input = xfr; -/* - result = encode(1024, input); - - boolean ok = true; - - for(int j = 10; ok && j < 400; j++) - { - - byte[] result2 = encode(j,input); - - for(int i = 0; i <400; i++) - { - if(result[i] != result2[i]) - { - System.out.println("result differs at " + i + " Splitting at " + j+ " [" + result[i] + " - " + result2[i] + "]"); - //break; - //ok = false; - - } - } - }*/ - //System.out.println(Arrays.equals(result, result2)); - - //doEncodes(); - /*OutputStream out = System.out; - if(args.length > 0) - { - out = new FileOutputStream(args[0]); - } - - Transfer xfr = new Transfer(); - fs.setSessionCredit(UnsignedInteger.valueOf(1024)); - fs.setTransferCount(UnsignedInteger.valueOf(2)); - fs.setLinkCredit(UnsignedInteger.valueOf(18)); - fs.setAvailable(UnsignedInteger.valueOf(0)); - fs.setDrain(false); - - xfr.setHandle(UnsignedInteger.valueOf(0)); - //xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes())); - xfr.setDeliveryTag(new Binary(new byte[] {0})); - xfr.setTransferId(UnsignedInteger.valueOf(0)); - xfr.setSettled(true); - xfr.setFlowState(fs); - - Header h = new Header(); - h.setTransmitTime(new Date(System.currentTimeMillis())); - Properties p = new Properties(); - p.setTo(new Address("queue")); - //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes())); - AmqpMapSection m = new AmqpMapSection(); - DataSection b = new DataSection("Hello World!".getBytes()); - - Footer f = new Footer(); - - Section[] sections = new Section[] { h,p,m,b,f}; - //Section[] sections = new Section[] { b }; - //Section[] sections = { h,p, b}; - List fragments = new ArrayList(5); - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry); - - for(Section s : sections) - { - Fragment frag = new Fragment(); - - frag.setPayload(s.encode(encoder)); - frag.setFirst(true); - frag.setLast(true); - frag.setFormatCode(s.getSectionCode()); - frag.setFragmentOffset(null); - fragments.add(frag); - } - - xfr.setFragments(fragments); - - - Object[] objectsToWrite = new Object[] { xfr }; - ByteBuffer buf = ByteBuffer.allocate(4096); - - - for(Object obj : objectsToWrite) - { - ValueWriter writer = typeRegistry.getValueWriter(obj); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - - out.flush(); - out.close();*/ - - } - - public static void doEncodes() throws IOException, ParseException - { - encodeTypes("boolean", Boolean.TRUE, Boolean.FALSE); - encodeTypes("ubyte", UnsignedByte.valueOf((byte)0), UnsignedByte.valueOf((byte)1 ),UnsignedByte.valueOf((byte)3), UnsignedByte.valueOf((byte)42), UnsignedByte.valueOf("255")); - encodeTypes("byte", Byte.valueOf((byte)0), Byte.valueOf( (byte)1), Byte.valueOf((byte) 3), Byte.valueOf((byte) 42), Byte.valueOf((byte) 127), Byte.valueOf((byte) -1), Byte.valueOf((byte) -3), Byte.valueOf((byte) -42), Byte.valueOf( (byte)-128)); - encodeTypes("ushort", UnsignedShort.valueOf((short)0), UnsignedShort.valueOf((short)1), UnsignedShort.valueOf((short)3), UnsignedShort.valueOf((short)42), UnsignedShort.valueOf("65535")); - encodeTypes("short", Short.valueOf((short)0), Short.valueOf((short)1), Short.valueOf((short)3), Short.valueOf((short)42), Short.valueOf((short)32767), Short.valueOf((short)-1), Short.valueOf((short)-3), Short.valueOf((short)-42), Short.valueOf((short)-32768)); - encodeTypes("uint",UnsignedInteger.valueOf(0), UnsignedInteger.valueOf(1), UnsignedInteger.valueOf(3), UnsignedInteger.valueOf(42), UnsignedInteger.valueOf("4294967295")); - encodeTypes("int", 0, 1, 3, 42, 2147483647, -1, -3, -42, -2147483648); - encodeTypes("ulong", UnsignedLong.valueOf(0), UnsignedLong.valueOf(1), UnsignedLong.valueOf(3), UnsignedLong.valueOf(42), UnsignedLong.valueOf("18446744073709551615")); - encodeTypes("long", 0l, 1l, 3l, 42l, 9223372036854775807l, -1l, -3l, -42l, -9223372036854775808l); - encodeTypes("float", 3.14159); - encodeTypes("double", Double.valueOf(3.14159265359)); - encodeTypes("char", '?'); - - SimpleDateFormat df = new SimpleDateFormat("HHa z MMM d yyyy"); - - encodeTypes("timestamp", df.parse("9AM PST Dec 6 2010"), df.parse("9AM PST Dec 6 1910")); - encodeTypes("uuid", UUID.fromString("f275ea5e-0c57-4ad7-b11a-b20c563d3b71")); - encodeTypes("binary", new Binary( new byte[] {(byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF}), new Binary(new byte[] { (byte)0xCA,(byte)0xFE, (byte)0xBA, (byte)0xBE})); - encodeTypes("string", "The quick brown fox jumped over the lazy cow."); - encodeTypes("symbol", Symbol.valueOf("connectathon")); - encodeTypes("list", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE})); - Map map = new HashMap(); - map.put("one", Long.valueOf(1)); - map.put("two", Long.valueOf(2)); - map.put("pi", Double.valueOf(3.14159265359)); - map.put("list:", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE})); - map.put(null, Boolean.TRUE); - encodeTypes("map", map); - encodeTypes("null", null); - - } - - static void encodeTypes(String name, Object... vals ) throws IOException - { - FileOutputStream out = new FileOutputStream("/home/rob/"+name+".out"); - ByteBuffer buf = ByteBuffer.allocate(4096); - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - if(vals != null) - { - for(Object obj : vals) - { - ValueWriter writer = typeRegistry.getValueWriter(obj); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - } - else - { - ValueWriter writer = typeRegistry.getValueWriter(null); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - out.flush(); - out.close(); - - } - - static byte[] encode(int size, Object... vals) - { - byte[] result = new byte[10000]; - int pos = 0; - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - AMQFrame frame = AMQFrame.createAMQFrame((short) 0, (FrameBody) vals[0]); - FrameWriter writer = new FrameWriter(typeRegistry); - /*for(Object obj : vals) - { - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - ValueWriter writer = typeRegistry.getValueWriter(obj); -*/ - int count; - - ByteBuffer buf = ByteBuffer.wrap(result, pos, size); - - do - { - - writer.writeToBuffer(buf); - pos = buf.position(); - buf = ByteBuffer.wrap(result, pos, size); - if(!writer.isComplete()) - { - count = 3; - } - - } while (!writer.isComplete()); -/* - - } -*/ - - return result; - - } - - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java deleted file mode 100644 index 6fe2a6d510..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.commons.cli.*; - -import java.util.logging.*; - -public abstract class Util -{ - - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - private String _host; - private String _username; - private String _password; - private int _port; - private int _count; - private boolean _useStdIn; - private boolean _useTran; - private String[] _args; - private AcknowledgeMode _mode; - private boolean _block; - private int _frameSize; - private int _messageSize; - private String _responseQueue; - private int _batchSize; - private double _rollbackRatio; - private String _linkName; - private String _containerName; - private boolean _durableLink; - private boolean _useMultipleConnections; - private int _windowSize = 100; - private String _subject; - private String _filter; - private String _remoteHost; - private boolean _useSSL; - - protected Util(String[] args) - { - CommandLineParser cmdLineParse = new PosixParser(); - - Options options = new Options(); - options.addOption("h","help",false,"show this help message and exit"); - options.addOption(OptionBuilder.withLongOpt("host") - .withDescription( "host to connect to (default 0.0.0.0)" ) - .hasArg(true) - .withArgName("HOST") - .create('H')); - options.addOption(OptionBuilder.withLongOpt("username") - .withDescription( "username to use for authentication" ) - .hasArg(true) - .withArgName("USERNAME") - .create('u')); - options.addOption(OptionBuilder.withLongOpt("password") - .withDescription( "password to use for authentication" ) - .hasArg(true) - .withArgName("PASSWORD") - .create('w')); - options.addOption(OptionBuilder.withLongOpt("port") - .withDescription( "port to connect to (default 5672)" ) - .hasArg(true) - .withArgName("PORT") - .create('p')); - options.addOption(OptionBuilder.withLongOpt("frame-size") - .withDescription( "specify the maximum frame size" ) - .hasArg(true) - .withArgName("FRAME_SIZE") - .create('f')); - options.addOption(OptionBuilder.withLongOpt("container-name") - .withDescription( "Container name" ) - .hasArg(true) - .withArgName("CONTAINER_NAME") - .create('C')); - - options.addOption(OptionBuilder.withLongOpt("ssl") - .withDescription("Use SSL") - .create('S')); - - options.addOption(OptionBuilder.withLongOpt("remote-hostname") - .withDescription( "hostname to supply in the open frame" ) - .hasArg(true) - .withArgName("HOST") - .create('O')); - - if(hasBlockOption()) - options.addOption(OptionBuilder.withLongOpt("block") - .withDescription("block until messages arrive") - .create('b')); - - if(hasCountOption()) - options.addOption(OptionBuilder.withLongOpt("count") - .withDescription( "number of messages to send (default 1)" ) - .hasArg(true) - .withArgName("COUNT") - .create('c')); - if(hasModeOption()) - options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") - .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) - .hasArg(true) - .withArgName("MODE") - .create('k')); - - if(hasSubjectOption()) - options.addOption(OptionBuilder.withLongOpt("subject") - .withDescription( "subject message property" ) - .hasArg(true) - .withArgName("SUBJECT") - .create('s')); - - - if(hasSingleLinkPerConnectionMode()) - options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") - .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") - .hasArg(false) - .create('Z')); - - if(hasFilterOption()) - options.addOption(OptionBuilder.withLongOpt("filter") - .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") - .hasArg(true) - .withArgName("=") - .create('F')); - - - if(hasTxnOption()) - { - options.addOption("x","txn",false,"use transactions"); - options.addOption(OptionBuilder.withLongOpt("batch-size") - .withDescription( "transaction batch size (default: 1)" ) - .hasArg(true) - .withArgName("BATCH-SIZE") - .create('B')); - options.addOption(OptionBuilder.withLongOpt("rollback-ratio") - .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) - .hasArg(true) - .withArgName("RATIO") - .create('R')); - } - - if(hasLinkDurableOption()) - { - options.addOption("d","durable-link",false,"use a durable link"); - } - - if(hasStdInOption()) - options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); - - options.addOption(OptionBuilder.withLongOpt("trace") - .withDescription("trace logging specified categories: RAW, FRM") - .hasArg(true) - .withArgName("TRACE") - .create('t')); - if(hasSizeOption()) - options.addOption(OptionBuilder.withLongOpt("message-size") - .withDescription( "size to pad outgoing messages to" ) - .hasArg(true) - .withArgName("SIZE") - .create('z')); - - if(hasResponseQueueOption()) - options.addOption(OptionBuilder.withLongOpt("response-queue") - .withDescription( "response queue to reply to" ) - .hasArg(true) - .withArgName("RESPONSE_QUEUE") - .create('r')); - - if(hasLinkNameOption()) - { - options.addOption(OptionBuilder.withLongOpt("link") - .withDescription( "link name" ) - .hasArg(true) - .withArgName("LINK") - .create('l')); - } - - if(hasWindowSizeOption()) - { - options.addOption(OptionBuilder.withLongOpt("window-size") - .withDescription("credit window size") - .hasArg(true) - .withArgName("WINDOW-SIZE") - .create('W')); - } - - CommandLine cmdLine = null; - try - { - cmdLine = cmdLineParse.parse(options, args); - - } - catch (ParseException e) - { - printUsage(options); - System.exit(-1); - } - - if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) - { - printUsage(options); - System.exit(0); - } - _host = cmdLine.getOptionValue('H',"0.0.0.0"); - _remoteHost = cmdLine.getOptionValue('O',null); - String portStr = cmdLine.getOptionValue('p',"5672"); - String countStr = cmdLine.getOptionValue('c',"1"); - - _useSSL = cmdLine.hasOption('S'); - - if(hasWindowSizeOption()) - { - String windowSizeStr = cmdLine.getOptionValue('W',"100"); - _windowSize = Integer.parseInt(windowSizeStr); - } - - if(hasSubjectOption()) - { - _subject = cmdLine.getOptionValue('s'); - } - - if(cmdLine.hasOption('u')) - { - _username = cmdLine.getOptionValue('u'); - } - - if(cmdLine.hasOption('w')) - { - _password = cmdLine.getOptionValue('w'); - } - - if(cmdLine.hasOption('F')) - { - _filter = cmdLine.getOptionValue('F'); - } - - _port = Integer.parseInt(portStr); - - _containerName = cmdLine.getOptionValue('C'); - - if(hasBlockOption()) - _block = cmdLine.hasOption('b'); - - if(hasLinkNameOption()) - _linkName = cmdLine.getOptionValue('l'); - - - if(hasLinkDurableOption()) - _durableLink = cmdLine.hasOption('d'); - - if(hasCountOption()) - _count = Integer.parseInt(countStr); - - if(hasStdInOption()) - _useStdIn = cmdLine.hasOption('i'); - - if(hasSingleLinkPerConnectionMode()) - _useMultipleConnections = cmdLine.hasOption('Z'); - - if(hasTxnOption()) - { - _useTran = cmdLine.hasOption('x'); - _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); - _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); - } - - if(hasModeOption()) - { - _mode = AcknowledgeMode.ALO; - - if(cmdLine.hasOption('k')) - { - _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); - } - } - - if(hasResponseQueueOption()) - { - _responseQueue = cmdLine.getOptionValue('r'); - } - - _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); - - if(hasSizeOption()) - { - _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); - } - - String categoriesList = cmdLine.getOptionValue('t'); - String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); - for(String cat : categories) - { - if(cat.equalsIgnoreCase("FRM")) - { - FRAME_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : FRAME_LOGGER.getHandlers()) - { - FRAME_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - FRAME_LOGGER.addHandler(handler); - } - else if (cat.equalsIgnoreCase("RAW")) - { - RAW_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : RAW_LOGGER.getHandlers()) - { - RAW_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - RAW_LOGGER.addHandler(handler); - } - } - - - _args = cmdLine.getArgs(); - - } - - protected boolean hasFilterOption() - { - return false; - } - - protected boolean hasSubjectOption() - { - return false; - } - - protected boolean hasWindowSizeOption() - { - return false; - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected abstract boolean hasLinkDurableOption(); - - protected abstract boolean hasLinkNameOption(); - - protected abstract boolean hasResponseQueueOption(); - - protected abstract boolean hasSizeOption(); - - protected abstract boolean hasBlockOption(); - - protected abstract boolean hasStdInOption(); - - protected abstract boolean hasTxnOption(); - - protected abstract boolean hasModeOption(); - - protected abstract boolean hasCountOption(); - - public String getHost() - { - return _host; - } - - public String getUsername() - { - return _username; - } - - public String getPassword() - { - return _password; - } - - public int getPort() - { - return _port; - } - - public int getCount() - { - return _count; - } - - public boolean useStdIn() - { - return _useStdIn; - } - - public boolean useTran() - { - return _useTran; - } - - public AcknowledgeMode getMode() - { - return _mode; - } - - public boolean isBlock() - { - return _block; - } - - public String[] getArgs() - { - return _args; - } - - public int getMessageSize() - { - return _messageSize; - } - - public String getResponseQueue() - { - return _responseQueue; - } - - public int getBatchSize() - { - return _batchSize; - } - - public double getRollbackRatio() - { - return _rollbackRatio; - } - - public String getLinkName() - { - return _linkName; - } - - public boolean isDurableLink() - { - return _durableLink; - } - - public boolean isUseMultipleConnections() - { - return _useMultipleConnections; - } - - public void setUseMultipleConnections(boolean useMultipleConnections) - { - _useMultipleConnections = useMultipleConnections; - } - - public String getSubject() - { - return _subject; - } - - public void setSubject(String subject) - { - _subject = subject; - } - - protected abstract void printUsage(final Options options); - - protected abstract void run(); - - - public Connection newConnection() throws Connection.ConnectionException - { - Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); - return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, - _remoteHost == null ? getHost() : _remoteHost, _useSSL) - : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, - container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); - } - - public String getContainerName() - { - return _containerName; - } - - public int getWindowSize() - { - return _windowSize; - } - - public void setWindowSize(int windowSize) - { - _windowSize = windowSize; - } - - public String getFilter() - { - return _filter; - } -} -- cgit v1.2.1