From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- java/amqp-1-0-client/example/build.xml | 28 ++ .../org/apache/qpid/amqp_1_0/client/Command.java | 43 ++ .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 407 ++++++++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 136 ++++++ .../apache/qpid/amqp_1_0/client/Filereceiver.java | 347 ++++++++++++++ .../apache/qpid/amqp_1_0/client/Filesender.java | 296 ++++++++++++ .../org/apache/qpid/amqp_1_0/client/Receive.java | 246 ++++++++++ .../org/apache/qpid/amqp_1_0/client/Request.java | 249 ++++++++++ .../org/apache/qpid/amqp_1_0/client/Respond.java | 347 ++++++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Send.java | 244 ++++++++++ .../java/org/apache/qpid/amqp_1_0/client/Util.java | 529 +++++++++++++++++++++ 11 files changed, 2872 insertions(+) create mode 100644 java/amqp-1-0-client/example/build.xml create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java create mode 100644 java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java (limited to 'java/amqp-1-0-client/example') diff --git a/java/amqp-1-0-client/example/build.xml b/java/amqp-1-0-client/example/build.xml new file mode 100644 index 0000000000..89bba729dd --- /dev/null +++ b/java/amqp-1-0-client/example/build.xml @@ -0,0 +1,28 @@ + + + + + + + + + diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java new file mode 100644 index 0000000000..3bb26744c4 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.lang.reflect.InvocationTargetException; + +public class Command +{ + public static void main(String[] args) throws + ClassNotFoundException, + NoSuchMethodException, + InvocationTargetException, + IllegalAccessException, + InstantiationException + { + String name = args[0]; + String[] cmdArgs = new String[args.length-1]; + System.arraycopy(args,1,cmdArgs,0,args.length-1); + name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); + Class clazz = (Class) Class.forName(name); + Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); + util.run(); + + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java new file mode 100644 index 0000000000..b58ce6bfe5 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -0,0 +1,407 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Demo extends Util +{ + private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; + private static final String OPCODE = "opcode"; + private static final String ACTION = "action"; + private static final String MESSAGE_ID = "message-id"; + private static final String VENDOR = "vendor"; + private static final String LOG = "log"; + private static final String RECEIVED = "received"; + private static final String TEST = "test"; + private static final String APACHE = "apache"; + private static final String SENT = "sent"; + private static final String LINK_REF = "link-ref"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String SASL_USER = "sasl-user"; + private static final String SASL_PASSWORD = "sasl-password"; + private static final String ROLE = "role"; + private static final String ADDRESS = "address"; + private static final String SENDER = "sender"; + private static final String SEND_MESSAGE = "send-message"; + private static final String ANNOUNCE = "announce"; + private static final String MESSAGE_VENDOR = "message-vendor"; + private static final String CREATE_LINK = "create-link"; + + public static void main(String[] args) + { + new Demo(args).run(); + } + + public Demo(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected boolean hasWindowSizeOption() + { + return false; + } + + public void run() + { + + try + { + + final String vendor = getArgs()[0]; + final String queue = "control"; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + + Receiver responseReceiver; + + responseReceiver = session.createTemporaryQueueReceiver(); + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + + Properties properties = new Properties(); + properties.setMessageId(java.util.UUID.randomUUID()); + properties.setReplyTo(responseReceiver.getAddress()); + + HashMap appPropMap = new HashMap(); + ApplicationProperties appProperties = new ApplicationProperties(appPropMap); + + appPropMap.put(OPCODE, ANNOUNCE); + appPropMap.put(VENDOR, vendor); + appPropMap.put(ADDRESS,responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { properties, appProperties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1); + + Map sendingLinks = new HashMap(); + Map receivingLinks = new HashMap(); + + + boolean done = false; + + while(!done) + { + boolean wait = true; + Message m = responseReceiver.receive(false); + if(m != null) + { + List
payload = m.getPayload(); + wait = false; + ApplicationProperties props = m.getApplicationProperties(); + Map map = props.getValue(); + String op = (String) map.get(OPCODE); + if("reset".equals(op)) + { + for(Sender sender : sendingLinks.values()) + { + try + { + sender.close(); + Session session1 = sender.getSession(); + session1.close(); + session1.getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + for(Receiver receiver : receivingLinks.values()) + { + try + { + receiver.close(); + receiver.getSession().close(); + receiver.getSession().getConnection().close(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + sendingLinks.clear(); + receivingLinks.clear(); + } + else if(CREATE_LINK.equals(op)) + { + Object linkRef = map.get(LINK_REF); + String host = (String) map.get(HOST); + Object o = map.get(PORT); + int port = Integer.parseInt(String.valueOf(o)); + String user = (String) map.get(SASL_USER); + String password = (String) map.get(SASL_PASSWORD); + String role = (String) map.get(ROLE); + String address = (String) map.get(ADDRESS); + System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); + try{ + + + Connection conn2 = new Connection(host, port, user, password, host); + Session session2 = conn2.createSession(); + if(sendingLinks.containsKey(linkRef)) + { + try + { + sendingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(receivingLinks.containsKey(linkRef)) + { + try + { + receivingLinks.remove(linkRef).close(); + } + catch (Exception e) + { + + } + } + if(SENDER.equals(role)) + { + + System.err.println("%%% Creating sender (" + linkRef + ")"); + Sender sender = session2.createSender(address); + sendingLinks.put(linkRef, sender); + } + else + { + + System.err.println("%%% Creating receiver (" + linkRef + ")"); + Receiver receiver2 = session2.createReceiver(address); + receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + receivingLinks.put(linkRef, receiver2); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + else if(SEND_MESSAGE.equals(op)) + { + Sender sender = sendingLinks.get(map.get(LINK_REF)); + Properties m2props = new Properties(); + Object messageId = map.get(MESSAGE_ID); + m2props.setMessageId(messageId); + Map m2propmap = new HashMap(); + m2propmap.put(OPCODE, TEST); + m2propmap.put(VENDOR, vendor); + ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); + Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); + sender.send(m2); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, SENT); + m3propmap.put(MESSAGE_ID, messageId); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, vendor); + + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+messageId))); + s.send(m3); + + } + + responseReceiver.acknowledge(m); + } + else + { + for(Map.Entry entry : receivingLinks.entrySet()) + { + m = entry.getValue().receive(false); + if(m != null) + { + wait = false; + + System.err.println("%%% Received message from " + entry.getKey()); + + Properties mp = m.getProperties(); + ApplicationProperties ap = m.getApplicationProperties(); + + Map m3propmap = new HashMap(); + m3propmap.put(OPCODE, LOG); + m3propmap.put(ACTION, RECEIVED); + m3propmap.put(MESSAGE_ID, mp.getMessageId()); + m3propmap.put(VENDOR, vendor); + m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); + + Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), + new AmqpValue("AMQP-"+mp.getMessageId()))); + s.send(m3); + + entry.getValue().acknowledge(m); + } + + } + } + + if(wait) + { + try + { + Thread.sleep(500l); + } + catch (InterruptedException e) + { + e.printStackTrace(); //TODO. + } + } + + } + + + + + + + + + + s.close(); + session.close(); + conn.close(); + + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java new file mode 100644 index 0000000000..65d27b21f8 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.commons.cli.Options; + +public class Dump extends Util +{ + private static final String USAGE_STRING = "dump [options]
\n\nOptions:"; + + + protected Dump(String[] args) + { + super(args); + } + + public static void main(String[] args) + { + new Dump(args).run(); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasLinkNameOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasSizeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasBlockOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasStdInOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasTxnOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasModeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasCountOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void printUsage(Options options) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, 10); + + Message message = new Message("dump me"); + message.setDeliveryTag(new Binary("dump".getBytes())); + + s.send(message); + + s.close(); + session.close(); + conn.close(); + + } catch (Connection.ConnectionException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java new file mode 100644 index 0000000000..4d98655ad2 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; + +public class Filereceiver extends Util +{ + private static final String USAGE_STRING = "filereceiver [options]
\n\nOptions:"; + + protected Filereceiver(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + final File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + Map unsettled = new HashMap(); + final Map unsettledFileNames = new HashMap(); + + Accepted accepted = new Accepted(); + + for(String fileName : unsettledFiles) + { + File theFile = new File(tmpDirectory, fileName); + if(theFile.isFile()) + { + if(fileName.startsWith("~") && fileName.endsWith("~")) + { + theFile.delete(); + } + else + { + int splitPoint = fileName.indexOf("."); + String deliveryTagStr = fileName.substring(0,splitPoint); + String actualFileName = fileName.substring(splitPoint+1); + + byte[] bytes = new byte[deliveryTagStr.length()/2]; + + + for(int i = 0; i < bytes.length; i++) + { + char c = deliveryTagStr.charAt(2*i); + char d = deliveryTagStr.charAt(1+(2*i)); + + bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) + | (d <= '9' ? d - '0' : d - 'W')); + + } + Binary deliveryTag = new Binary(bytes); + unsettled.put(deliveryTag, accepted); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + } + + Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map remoteUnsettled = r.getRemoteUnsettled(); + + for(Map.Entry entry : unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + + File tmpFile = new File(tmpDirectory, entry.getValue()); + final File dest = new File(directory, + entry.getValue().substring(entry.getValue().indexOf(".") + 1)); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + + tmpFile.renameTo(dest); + } + } + + + int credit = 10; + + r.setCredit(UnsignedInteger.valueOf(credit), true); + + + int received = 0; + Message m = null; + do + { + m = isBlock() && received == 0 ? r.receive() : r.receive(10000); + if(m != null) + { + if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) + { + final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); + final File unsettledFile = new File(tmpDirectory, + tmpFileName); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + int splitPoint = tmpFileName.indexOf("."); + + String fileName = tmpFileName.substring(splitPoint+1); + + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + unsettledFileNames.remove(deliveryTag); + } + }); + } + else + { + received++; + List
sections = m.getPayload(); + Binary deliveryTag = m.getDeliveryTag(); + StringBuilder tagNameBuilder = new StringBuilder(); + + ByteBuffer dtbuf = deliveryTag.asByteBuffer(); + while(dtbuf.hasRemaining()) + { + tagNameBuilder.append(String.format("%02x", dtbuf.get())); + } + + + ApplicationProperties properties = null; + List data = new ArrayList(); + int totalSize = 0; + for(Section section : sections) + { + if(section instanceof ApplicationProperties) + { + properties = (ApplicationProperties) section; + } + else if(section instanceof AmqpValue) + { + AmqpValue value = (AmqpValue) section; + if(value.getValue() instanceof Binary) + { + Binary binary = (Binary) value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + else + { + // TODO exception + } + } + else if(section instanceof Data) + { + Data value = (Data) section; + Binary binary = value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + } + if(properties != null) + { + final String fileName = (String) properties.getValue().get("filename"); + byte[] fileData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(fileData); + int offset = 0; + for(Binary bin : data) + { + buf.put(bin.asByteBuffer()); + } + File outputFile = new File(tmpDirectory, "~"+fileName+"~"); + if(outputFile.exists()) + { + outputFile.delete(); + } + FileOutputStream fos = new FileOutputStream(outputFile); + fos.write(fileData); + fos.flush(); + fos.close(); + + final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + + fileName); + outputFile.renameTo(unsettledFile); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + + } + }); + + } + } + } + } + while(m != null); + + + r.close(); + } + else + { + System.err.println("No such directory: " + directoryName); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (FileNotFoundException e) + { + e.printStackTrace(); //TODO. + } + catch (IOException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + public static void main(String[] args) + { + new Filereceiver(args).run(); + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java new file mode 100644 index 0000000000..46e6ba537f --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -0,0 +1,296 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +public class Filesender extends Util +{ + private static final String USAGE_STRING = "filesender [options]
\n\nOptions:"; + + protected Filesender(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + Connection conn = newConnection(); + + Session session = conn.createSession(); + + File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + + + Map unsettled = new HashMap(); + Map unsettledFileNames = new HashMap(); + for(String fileName : unsettledFiles) + { + File aFile = new File(tmpDirectory, fileName); + if(aFile.canRead() && aFile.canWrite()) + { + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + unsettled.put(deliveryTag, null); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + + Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map remoteUnsettled = s.getRemoteUnsettled(); + + for(Map.Entry entry: unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); + } + } + + if(remoteUnsettled != null) + { + for(Map.Entry entry : remoteUnsettled.entrySet()) + { + if(entry.getValue() instanceof Accepted) + { + final String fileName = unsettledFileNames.get(entry.getKey()); + if(fileName != null) + { + + Message resumed = new Message(); + resumed.setDeliveryTag(entry.getKey()); + resumed.setDeliveryState(entry.getValue()); + resumed.setResume(Boolean.TRUE); + resumed.setSettled(Boolean.TRUE); + + + + final File unsettledFile = new File(tmpDirectory, fileName); + unsettledFile.delete(); + + s.send(resumed); + + } + + } + else if(entry.getValue() instanceof Received || entry.getValue() == null) + { + final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); + Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); + resumed.setResume(Boolean.TRUE); + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + s.send(resumed, action); + + } + } + } + + + + String[] files = directory.list(); + + for(String fileName : files) + { + final File file = new File(directory, fileName); + + if(file.canRead() && file.canWrite() && !file.isDirectory()) + { + Message message = createMessageFromFile(md5, fileName, file); + + final File unsettledFile = new File(tmpDirectory, fileName); + + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + + file.renameTo(unsettledFile); + + s.send(message, action); + } + } + + s.close(); + } + else + { + System.err.println("No such directory: " + directory); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); + } catch (FileNotFoundException e) + { + e.printStackTrace(); + } catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (NoSuchAlgorithmException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } + + private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException + { + FileInputStream fis = new FileInputStream(file); + byte[] data = new byte[(int) file.length()]; + + int read = fis.read(data); + + fis.close(); + + Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); + Section amqpValue = new Data(new Binary(data)); + Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + message.setDeliveryTag(deliveryTag); + md5.reset(); + return message; + } + + public static void main(String[] args) + { + new Filesender(args).run(); + } +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java new file mode 100644 index 0000000000..0da9dc3fb7 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.commons.cli.*; +import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; + +import java.util.Collections; + +public class Receive extends Util +{ + private static final String USAGE_STRING = "receive [options]
\n\nOptions:"; + private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); + private UnsignedLong _lastCorrelationId; + + public static void main(String[] args) + { + new Receive(args).run(); + } + + + public Receive(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasFilterOption() + { + return true; + } + + protected void run() + { + + try + { + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + + + Session session = conn.createSession(); + + Filter filter = null; + if(getFilter() != null) + { + String[] filterParts = getFilter().split("=",2); + if("exact-subject".equals(filterParts[0])) + { + filter = new ExactSubjectFilter(filterParts[1]); + } + else if("matching-subject".equals(filterParts[0])) + { + filter = new MatchingSubjectFilter(filterParts[1]); + } + else + { + System.err.println("Unknown filter type: " + filterParts[0]); + } + } + + Receiver r = + filter == null + ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) + : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); + Transaction txn = null; + + int credit = 0; + int receivedCount = 0; + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + + Transaction txn2 = null; + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + txn2 = session.createSessionLocalTransaction(); + } + + for(int i = 0; i < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + { + r.setCredit(UnsignedInteger.valueOf(credit), false); + } + if(!isBlock()) + r.drain(); + } + + Message m = isBlock() ? r.receive() : r.receive(1000L); + credit--; + if(m==null) + { + break; + } + + + + r.acknowledge(m.getDeliveryTag(),txn); + + receivedCount++; + + System.out.println("Received Message : " + m.getPayload()); + } + + if(useTran()) + { + txn.commit(); + } + } + else + { + // TODO + } + } + else + { + // TODO + } + r.close(); + session.close(); + conn.close(); + System.out.println("Total Messages Received: " + receivedCount); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java new file mode 100644 index 0000000000..6e1d15376c --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -0,0 +1,249 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.Arrays; + +public class Request extends Util +{ + private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; + + public static void main(String[] args) + { + new Request(args).run(); + } + + public Request(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public void run() + { + + try + { + + + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + Session session = conn.createSession(); + + Connection conn2; + Session session2; + Receiver responseReceiver; + + if(isUseMultipleConnections()) + { + conn2 = newConnection(); + session2 = conn2.createSession(); + responseReceiver = session2.createTemporaryQueueReceiver(); + } + else + { + conn2 = null; + session2 = null; + responseReceiver = session.createTemporaryQueueReceiver(); + } + + + + + responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); + + + + Sender s = session.createSender(queue, getWindowSize(), getMode()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + int received = 0; + + if(getArgs().length >= 2) + { + message = getArgs()[1]; + if(message.length() < getMessageSize()) + { + StringBuilder builder = new StringBuilder(getMessageSize()); + builder.append(message); + for(int x = message.length(); x < getMessageSize(); x++) + { + builder.append('.'); + } + message = builder.toString(); + } + + for(int i = 0; i < getCount(); i++) + { + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + properties.setReplyTo(responseReceiver.getAddress()); + + AmqpValue amqpValue = new AmqpValue(message); + Section[] sections = { new Header() , properties, amqpValue}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + + Message responseMessage = responseReceiver.receive(false); + if(responseMessage != null) + { + responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); + received++; + } + } + } + + if(txn != null) + { + txn.commit(); + } + + + while(received < getCount()) + { + Message responseMessage = responseReceiver.receive(); + responseReceiver.acknowledge(responseMessage.getDeliveryTag()); + received++; + } + + + + + s.close(); + session.close(); + conn.close(); + + if(session2 != null) + { + session2.close(); + conn2.close(); + } + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java new file mode 100644 index 0000000000..8d9de4893f --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.*; + +public class Respond extends Util +{ + private static final String USAGE_STRING = "respond [options]
\n\nOptions:"; + private Connection _conn; + private Session _session; + private Receiver _receiver; + private Transaction _txn; + private Map _senders; + private UnsignedLong _responseMsgId = UnsignedLong.ZERO; + private Connection _conn2; + private Session _session2; + + public Respond(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return true; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public static void main(String[] args) + { + new Respond(args).run(); + } + + public void run() + { + try + { + + _senders = new HashMap(); + + final String queue = getArgs()[0]; + + String message = ""; + + _conn = newConnection(); + + + + if(isUseMultipleConnections()) + { + _conn2 = newConnection(); + _session2 = _conn2.createSession(); + } + + + _session = _conn.createSession(); + + + _receiver = _session.createReceiver(queue, getMode()); + _txn = null; + + int credit = 0; + int receivedCount = 0; + _responseMsgId = UnsignedLong.ZERO; + + Random random = null; + int batch = 0; + List txnMessages = null; + if(useTran()) + { + if(getRollbackRatio() != 0) + { + random = new Random(); + } + batch = getBatchSize(); + _txn = _session.createSessionLocalTransaction(); + txnMessages = new ArrayList(batch); + } + + + for(int i = 0; receivedCount < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + _receiver.setCredit(UnsignedInteger.valueOf(credit), false); + + if(!isBlock()) + _receiver.drain(); + } + + Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); + credit--; + if(m==null) + { + if(useTran() && batch != getBatchSize()) + { + _txn.commit(); + } + break; + } + + System.out.println("Received Message: " + m.getPayload()); + + respond(m); + + + + if(useTran()) + { + + txnMessages.add(m); + + if(--batch == 0) + { + + if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) + { + _txn.commit(); + txnMessages.clear(); + receivedCount += getBatchSize(); + } + else + { + System.out.println("Random Rollback"); + _txn.rollback(); + double result; + do + { + _txn = _session.createSessionLocalTransaction(); + + for(Message msg : txnMessages) + { + respond(msg); + } + + result = random.nextDouble(); + if(result sections = m.getPayload(); + String replyTo = null; + Object correlationId = null; + for(Section section : sections) + { + if(section instanceof Properties) + { + replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); + correlationId = ((Properties) section).getMessageId(); + break; + } + } + + if(replyTo != null) + { + Sender s = _senders.get(replyTo); + if(s == null) + { + s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); + _senders.put(replyTo, s); + } + + List
replySections = new ArrayList
(sections); + + ListIterator
sectionIterator = replySections.listIterator(); + + while(sectionIterator.hasNext()) + { + Section section = sectionIterator.next(); + if(section instanceof Properties) + { + Properties newProps = new Properties(); + newProps.setTo(replyTo); + newProps.setCorrelationId(correlationId); + newProps.setMessageId(_responseMsgId); + _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); + sectionIterator.set(newProps); + } + } + + Message replyMessage = new Message(replySections); + System.out.println("Sent Message: " + replySections); + s.send(replyMessage, _txn); + + } + _receiver.acknowledge(m.getDeliveryTag(), _txn); + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java new file mode 100644 index 0000000000..6f6575e083 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -0,0 +1,244 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.util.Arrays; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +public class Send extends Util +{ + private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; + private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; + + + public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException + { + new Send(args).run(); + } + + + public Send(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return true; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasSubjectOption() + { + return true; + } + + public void run() + { + + final String queue = getArgs()[0]; + + String message = ""; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); + + Transaction txn = null; + + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + } + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + if(getArgs().length == 2) + { + message = getArgs()[1]; + } + for(int i = 0; i < getCount(); i++) + { + + Properties properties = new Properties(); + properties.setMessageId(UnsignedLong.valueOf(i)); + if(getSubject() != null) + { + properties.setSubject(getSubject()); + } + Section bodySection; + byte[] bytes = (message + " " + i).getBytes(); + if(bytes.length < getMessageSize()) + { + byte[] origBytes = bytes; + bytes = new byte[getMessageSize()]; + System.arraycopy(origBytes,0,bytes,0,origBytes.length); + for(int x = origBytes.length; x < bytes.length; x++) + { + bytes[x] = (byte) '.'; + } + bodySection = new Data(new Binary(bytes)); + } + else + { + bodySection = new AmqpValue(message + " " + i); + } + + Section[] sections = {properties, bodySection}; + final Message message1 = new Message(Arrays.asList(sections)); + + s.send(message1, txn); + } + } + else + { + for(int i = 1; i < getArgs().length; i++) + { + s.send(new Message(getArgs()[i]), txn); + } + + } + } + else + { + LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); + + + try + { + while((message = buf.readLine()) != null) + { + s.send(new Message(message), txn); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + } + + if(txn != null) + { + txn.commit(); + } + + s.close(); + + session.close(); + conn.close(); + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java new file mode 100644 index 0000000000..6fe2a6d510 --- /dev/null +++ b/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java @@ -0,0 +1,529 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.commons.cli.*; + +import java.util.logging.*; + +public abstract class Util +{ + + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + private String _host; + private String _username; + private String _password; + private int _port; + private int _count; + private boolean _useStdIn; + private boolean _useTran; + private String[] _args; + private AcknowledgeMode _mode; + private boolean _block; + private int _frameSize; + private int _messageSize; + private String _responseQueue; + private int _batchSize; + private double _rollbackRatio; + private String _linkName; + private String _containerName; + private boolean _durableLink; + private boolean _useMultipleConnections; + private int _windowSize = 100; + private String _subject; + private String _filter; + private String _remoteHost; + private boolean _useSSL; + + protected Util(String[] args) + { + CommandLineParser cmdLineParse = new PosixParser(); + + Options options = new Options(); + options.addOption("h","help",false,"show this help message and exit"); + options.addOption(OptionBuilder.withLongOpt("host") + .withDescription( "host to connect to (default 0.0.0.0)" ) + .hasArg(true) + .withArgName("HOST") + .create('H')); + options.addOption(OptionBuilder.withLongOpt("username") + .withDescription( "username to use for authentication" ) + .hasArg(true) + .withArgName("USERNAME") + .create('u')); + options.addOption(OptionBuilder.withLongOpt("password") + .withDescription( "password to use for authentication" ) + .hasArg(true) + .withArgName("PASSWORD") + .create('w')); + options.addOption(OptionBuilder.withLongOpt("port") + .withDescription( "port to connect to (default 5672)" ) + .hasArg(true) + .withArgName("PORT") + .create('p')); + options.addOption(OptionBuilder.withLongOpt("frame-size") + .withDescription( "specify the maximum frame size" ) + .hasArg(true) + .withArgName("FRAME_SIZE") + .create('f')); + options.addOption(OptionBuilder.withLongOpt("container-name") + .withDescription( "Container name" ) + .hasArg(true) + .withArgName("CONTAINER_NAME") + .create('C')); + + options.addOption(OptionBuilder.withLongOpt("ssl") + .withDescription("Use SSL") + .create('S')); + + options.addOption(OptionBuilder.withLongOpt("remote-hostname") + .withDescription( "hostname to supply in the open frame" ) + .hasArg(true) + .withArgName("HOST") + .create('O')); + + if(hasBlockOption()) + options.addOption(OptionBuilder.withLongOpt("block") + .withDescription("block until messages arrive") + .create('b')); + + if(hasCountOption()) + options.addOption(OptionBuilder.withLongOpt("count") + .withDescription( "number of messages to send (default 1)" ) + .hasArg(true) + .withArgName("COUNT") + .create('c')); + if(hasModeOption()) + options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") + .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) + .hasArg(true) + .withArgName("MODE") + .create('k')); + + if(hasSubjectOption()) + options.addOption(OptionBuilder.withLongOpt("subject") + .withDescription( "subject message property" ) + .hasArg(true) + .withArgName("SUBJECT") + .create('s')); + + + if(hasSingleLinkPerConnectionMode()) + options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") + .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") + .hasArg(false) + .create('Z')); + + if(hasFilterOption()) + options.addOption(OptionBuilder.withLongOpt("filter") + .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") + .hasArg(true) + .withArgName("=") + .create('F')); + + + if(hasTxnOption()) + { + options.addOption("x","txn",false,"use transactions"); + options.addOption(OptionBuilder.withLongOpt("batch-size") + .withDescription( "transaction batch size (default: 1)" ) + .hasArg(true) + .withArgName("BATCH-SIZE") + .create('B')); + options.addOption(OptionBuilder.withLongOpt("rollback-ratio") + .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) + .hasArg(true) + .withArgName("RATIO") + .create('R')); + } + + if(hasLinkDurableOption()) + { + options.addOption("d","durable-link",false,"use a durable link"); + } + + if(hasStdInOption()) + options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); + + options.addOption(OptionBuilder.withLongOpt("trace") + .withDescription("trace logging specified categories: RAW, FRM") + .hasArg(true) + .withArgName("TRACE") + .create('t')); + if(hasSizeOption()) + options.addOption(OptionBuilder.withLongOpt("message-size") + .withDescription( "size to pad outgoing messages to" ) + .hasArg(true) + .withArgName("SIZE") + .create('z')); + + if(hasResponseQueueOption()) + options.addOption(OptionBuilder.withLongOpt("response-queue") + .withDescription( "response queue to reply to" ) + .hasArg(true) + .withArgName("RESPONSE_QUEUE") + .create('r')); + + if(hasLinkNameOption()) + { + options.addOption(OptionBuilder.withLongOpt("link") + .withDescription( "link name" ) + .hasArg(true) + .withArgName("LINK") + .create('l')); + } + + if(hasWindowSizeOption()) + { + options.addOption(OptionBuilder.withLongOpt("window-size") + .withDescription("credit window size") + .hasArg(true) + .withArgName("WINDOW-SIZE") + .create('W')); + } + + CommandLine cmdLine = null; + try + { + cmdLine = cmdLineParse.parse(options, args); + + } + catch (ParseException e) + { + printUsage(options); + System.exit(-1); + } + + if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) + { + printUsage(options); + System.exit(0); + } + _host = cmdLine.getOptionValue('H',"0.0.0.0"); + _remoteHost = cmdLine.getOptionValue('O',null); + String portStr = cmdLine.getOptionValue('p',"5672"); + String countStr = cmdLine.getOptionValue('c',"1"); + + _useSSL = cmdLine.hasOption('S'); + + if(hasWindowSizeOption()) + { + String windowSizeStr = cmdLine.getOptionValue('W',"100"); + _windowSize = Integer.parseInt(windowSizeStr); + } + + if(hasSubjectOption()) + { + _subject = cmdLine.getOptionValue('s'); + } + + if(cmdLine.hasOption('u')) + { + _username = cmdLine.getOptionValue('u'); + } + + if(cmdLine.hasOption('w')) + { + _password = cmdLine.getOptionValue('w'); + } + + if(cmdLine.hasOption('F')) + { + _filter = cmdLine.getOptionValue('F'); + } + + _port = Integer.parseInt(portStr); + + _containerName = cmdLine.getOptionValue('C'); + + if(hasBlockOption()) + _block = cmdLine.hasOption('b'); + + if(hasLinkNameOption()) + _linkName = cmdLine.getOptionValue('l'); + + + if(hasLinkDurableOption()) + _durableLink = cmdLine.hasOption('d'); + + if(hasCountOption()) + _count = Integer.parseInt(countStr); + + if(hasStdInOption()) + _useStdIn = cmdLine.hasOption('i'); + + if(hasSingleLinkPerConnectionMode()) + _useMultipleConnections = cmdLine.hasOption('Z'); + + if(hasTxnOption()) + { + _useTran = cmdLine.hasOption('x'); + _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); + _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); + } + + if(hasModeOption()) + { + _mode = AcknowledgeMode.ALO; + + if(cmdLine.hasOption('k')) + { + _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); + } + } + + if(hasResponseQueueOption()) + { + _responseQueue = cmdLine.getOptionValue('r'); + } + + _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); + + if(hasSizeOption()) + { + _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); + } + + String categoriesList = cmdLine.getOptionValue('t'); + String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); + for(String cat : categories) + { + if(cat.equalsIgnoreCase("FRM")) + { + FRAME_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : FRAME_LOGGER.getHandlers()) + { + FRAME_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + FRAME_LOGGER.addHandler(handler); + } + else if (cat.equalsIgnoreCase("RAW")) + { + RAW_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : RAW_LOGGER.getHandlers()) + { + RAW_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + RAW_LOGGER.addHandler(handler); + } + } + + + _args = cmdLine.getArgs(); + + } + + protected boolean hasFilterOption() + { + return false; + } + + protected boolean hasSubjectOption() + { + return false; + } + + protected boolean hasWindowSizeOption() + { + return false; + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected abstract boolean hasLinkDurableOption(); + + protected abstract boolean hasLinkNameOption(); + + protected abstract boolean hasResponseQueueOption(); + + protected abstract boolean hasSizeOption(); + + protected abstract boolean hasBlockOption(); + + protected abstract boolean hasStdInOption(); + + protected abstract boolean hasTxnOption(); + + protected abstract boolean hasModeOption(); + + protected abstract boolean hasCountOption(); + + public String getHost() + { + return _host; + } + + public String getUsername() + { + return _username; + } + + public String getPassword() + { + return _password; + } + + public int getPort() + { + return _port; + } + + public int getCount() + { + return _count; + } + + public boolean useStdIn() + { + return _useStdIn; + } + + public boolean useTran() + { + return _useTran; + } + + public AcknowledgeMode getMode() + { + return _mode; + } + + public boolean isBlock() + { + return _block; + } + + public String[] getArgs() + { + return _args; + } + + public int getMessageSize() + { + return _messageSize; + } + + public String getResponseQueue() + { + return _responseQueue; + } + + public int getBatchSize() + { + return _batchSize; + } + + public double getRollbackRatio() + { + return _rollbackRatio; + } + + public String getLinkName() + { + return _linkName; + } + + public boolean isDurableLink() + { + return _durableLink; + } + + public boolean isUseMultipleConnections() + { + return _useMultipleConnections; + } + + public void setUseMultipleConnections(boolean useMultipleConnections) + { + _useMultipleConnections = useMultipleConnections; + } + + public String getSubject() + { + return _subject; + } + + public void setSubject(String subject) + { + _subject = subject; + } + + protected abstract void printUsage(final Options options); + + protected abstract void run(); + + + public Connection newConnection() throws Connection.ConnectionException + { + Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); + return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, + _remoteHost == null ? getHost() : _remoteHost, _useSSL) + : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, + container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); + } + + public String getContainerName() + { + return _containerName; + } + + public int getWindowSize() + { + return _windowSize; + } + + public void setWindowSize(int windowSize) + { + _windowSize = windowSize; + } + + public String getFilter() + { + return _filter; + } +} -- cgit v1.2.1