From 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 28 Feb 2013 16:14:30 +0000 Subject: Update from trunk r1375509 through r1450773 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/amqp_1_0/client/Command.java | 43 -- .../apache/qpid/amqp_1_0/client/Connection.java | 115 +---- .../java/org/apache/qpid/amqp_1_0/client/Demo.java | 407 ---------------- .../java/org/apache/qpid/amqp_1_0/client/Dump.java | 136 ------ .../apache/qpid/amqp_1_0/client/Filereceiver.java | 347 -------------- .../apache/qpid/amqp_1_0/client/Filesender.java | 296 ------------ .../org/apache/qpid/amqp_1_0/client/ReadBytes.java | 77 --- .../org/apache/qpid/amqp_1_0/client/Receive.java | 246 ---------- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 35 +- .../org/apache/qpid/amqp_1_0/client/Request.java | 249 ---------- .../org/apache/qpid/amqp_1_0/client/Respond.java | 347 -------------- .../java/org/apache/qpid/amqp_1_0/client/Send.java | 244 ---------- .../org/apache/qpid/amqp_1_0/client/SendBytes.java | 331 ------------- .../java/org/apache/qpid/amqp_1_0/client/Util.java | 529 --------------------- 14 files changed, 45 insertions(+), 3357 deletions(-) delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java delete mode 100644 java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java (limited to 'java/amqp-1-0-client/src') diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java deleted file mode 100644 index 3bb26744c4..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import java.lang.reflect.InvocationTargetException; - -public class Command -{ - public static void main(String[] args) throws - ClassNotFoundException, - NoSuchMethodException, - InvocationTargetException, - IllegalAccessException, - InstantiationException - { - String name = args[0]; - String[] cmdArgs = new String[args.length-1]; - System.arraycopy(args,1,cmdArgs,0,args.length-1); - name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); - Class clazz = (Class) Class.forName(name); - Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); - util.run(); - - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index e3d56fae09..e501662dbb 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -20,6 +20,15 @@ */ package org.apache.qpid.amqp_1_0.client; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.net.ssl.SSLSocketFactory; import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; @@ -30,17 +39,6 @@ import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.logging.Level; -import java.util.logging.Logger; - public class Connection { private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); @@ -224,7 +222,6 @@ public class Connection } - //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry()); ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn); Thread outputThread = new Thread(outputHandler); outputThread.setDaemon(true); @@ -236,8 +233,6 @@ public class Connection final ConnectionHandler handler = new ConnectionHandler(_conn); final InputStream inputStream = s.getInputStream(); - //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn)); - Thread inputThread = new Thread(new Runnable() { @@ -246,7 +241,6 @@ public class Connection try { doRead(handler, inputStream); -// doRead(transport, inputStream); } finally { @@ -268,85 +262,6 @@ public class Connection inputThread.setDaemon(true); inputThread.start(); -/* - Thread outputThread = new Thread(new Runnable() - { - - private int _lastWrite; - - public void run() - { - try - { -// doRead(handler, inputStream); - final Object lock = new Object(); - transport.setOutputStateChangeListener(new StateChangeListener() - { - - public void onStateChange(final boolean active) - { - synchronized (lock) - { - lock.notifyAll(); - } - } - }); - - synchronized(lock) - { - while(transport.isOpenForOutput()) - { - _lastWrite = 0; - transport.getNextBytes(new BytesProcessor() - { - - public void processBytes(final ByteBuffer buf) - { - _lastWrite = buf.remaining(); - try - { - outputStream.write(buf.array(), - buf.arrayOffset() + buf.position(), - buf.limit() - buf.position()); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - }); - if(_lastWrite == 0 && transport.isOpenForOutput()) - { - try - { - lock.wait(1000); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - } - finally - { - if(_conn.closedForInput() && _conn.closedForOutput()) - { - try - { - s.close(); - } - catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - }); -*/ - _conn.open(); } @@ -394,7 +309,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } @@ -419,7 +334,7 @@ public class Connection { int read; boolean done = false; - while(!done && (read = inputStream.read(buf)) != -1) + while(!handler.isDone() && (read = inputStream.read(buf)) != -1) { ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read); Binary b = new Binary(buf,0,read); @@ -428,12 +343,6 @@ public class Connection { RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString()); } - /*System.err.println(b); - System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone()); - if(handler.isDone()) - { - System.err.println(handler.getClass().getName() + "IS DONE!"); - } */ while(bbuf.hasRemaining() && !handler.isDone()) { handler.parse(bbuf); @@ -444,7 +353,7 @@ public class Connection } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } } diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java deleted file mode 100644 index b58ce6bfe5..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class Demo extends Util -{ - private static final String USAGE_STRING = "demo [options] [ ...]\n\nOptions:"; - private static final String OPCODE = "opcode"; - private static final String ACTION = "action"; - private static final String MESSAGE_ID = "message-id"; - private static final String VENDOR = "vendor"; - private static final String LOG = "log"; - private static final String RECEIVED = "received"; - private static final String TEST = "test"; - private static final String APACHE = "apache"; - private static final String SENT = "sent"; - private static final String LINK_REF = "link-ref"; - private static final String HOST = "host"; - private static final String PORT = "port"; - private static final String SASL_USER = "sasl-user"; - private static final String SASL_PASSWORD = "sasl-password"; - private static final String ROLE = "role"; - private static final String ADDRESS = "address"; - private static final String SENDER = "sender"; - private static final String SEND_MESSAGE = "send-message"; - private static final String ANNOUNCE = "announce"; - private static final String MESSAGE_VENDOR = "message-vendor"; - private static final String CREATE_LINK = "create-link"; - - public static void main(String[] args) - { - new Demo(args).run(); - } - - public Demo(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected boolean hasWindowSizeOption() - { - return false; - } - - public void run() - { - - try - { - - final String vendor = getArgs()[0]; - final String queue = "control"; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - - Receiver responseReceiver; - - responseReceiver = session.createTemporaryQueueReceiver(); - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - - Properties properties = new Properties(); - properties.setMessageId(java.util.UUID.randomUUID()); - properties.setReplyTo(responseReceiver.getAddress()); - - HashMap appPropMap = new HashMap(); - ApplicationProperties appProperties = new ApplicationProperties(appPropMap); - - appPropMap.put(OPCODE, ANNOUNCE); - appPropMap.put(VENDOR, vendor); - appPropMap.put(ADDRESS,responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { properties, appProperties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1); - - Map sendingLinks = new HashMap(); - Map receivingLinks = new HashMap(); - - - boolean done = false; - - while(!done) - { - boolean wait = true; - Message m = responseReceiver.receive(false); - if(m != null) - { - List
payload = m.getPayload(); - wait = false; - ApplicationProperties props = m.getApplicationProperties(); - Map map = props.getValue(); - String op = (String) map.get(OPCODE); - if("reset".equals(op)) - { - for(Sender sender : sendingLinks.values()) - { - try - { - sender.close(); - Session session1 = sender.getSession(); - session1.close(); - session1.getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - for(Receiver receiver : receivingLinks.values()) - { - try - { - receiver.close(); - receiver.getSession().close(); - receiver.getSession().getConnection().close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - sendingLinks.clear(); - receivingLinks.clear(); - } - else if(CREATE_LINK.equals(op)) - { - Object linkRef = map.get(LINK_REF); - String host = (String) map.get(HOST); - Object o = map.get(PORT); - int port = Integer.parseInt(String.valueOf(o)); - String user = (String) map.get(SASL_USER); - String password = (String) map.get(SASL_PASSWORD); - String role = (String) map.get(ROLE); - String address = (String) map.get(ADDRESS); - System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password); - try{ - - - Connection conn2 = new Connection(host, port, user, password, host); - Session session2 = conn2.createSession(); - if(sendingLinks.containsKey(linkRef)) - { - try - { - sendingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(receivingLinks.containsKey(linkRef)) - { - try - { - receivingLinks.remove(linkRef).close(); - } - catch (Exception e) - { - - } - } - if(SENDER.equals(role)) - { - - System.err.println("%%% Creating sender (" + linkRef + ")"); - Sender sender = session2.createSender(address); - sendingLinks.put(linkRef, sender); - } - else - { - - System.err.println("%%% Creating receiver (" + linkRef + ")"); - Receiver receiver2 = session2.createReceiver(address); - receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - receivingLinks.put(linkRef, receiver2); - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - else if(SEND_MESSAGE.equals(op)) - { - Sender sender = sendingLinks.get(map.get(LINK_REF)); - Properties m2props = new Properties(); - Object messageId = map.get(MESSAGE_ID); - m2props.setMessageId(messageId); - Map m2propmap = new HashMap(); - m2propmap.put(OPCODE, TEST); - m2propmap.put(VENDOR, vendor); - ApplicationProperties m2appProps = new ApplicationProperties(m2propmap); - Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId))); - sender.send(m2); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, SENT); - m3propmap.put(MESSAGE_ID, messageId); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, vendor); - - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+messageId))); - s.send(m3); - - } - - responseReceiver.acknowledge(m); - } - else - { - for(Map.Entry entry : receivingLinks.entrySet()) - { - m = entry.getValue().receive(false); - if(m != null) - { - wait = false; - - System.err.println("%%% Received message from " + entry.getKey()); - - Properties mp = m.getProperties(); - ApplicationProperties ap = m.getApplicationProperties(); - - Map m3propmap = new HashMap(); - m3propmap.put(OPCODE, LOG); - m3propmap.put(ACTION, RECEIVED); - m3propmap.put(MESSAGE_ID, mp.getMessageId()); - m3propmap.put(VENDOR, vendor); - m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR)); - - Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap), - new AmqpValue("AMQP-"+mp.getMessageId()))); - s.send(m3); - - entry.getValue().acknowledge(m); - } - - } - } - - if(wait) - { - try - { - Thread.sleep(500l); - } - catch (InterruptedException e) - { - e.printStackTrace(); //TODO. - } - } - - } - - - - - - - - - - s.close(); - session.close(); - conn.close(); - - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java deleted file mode 100644 index 65d27b21f8..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.commons.cli.Options; - -public class Dump extends Util -{ - private static final String USAGE_STRING = "dump [options]
\n\nOptions:"; - - - protected Dump(String[] args) - { - super(args); - } - - public static void main(String[] args) - { - new Dump(args).run(); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasLinkNameOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasSizeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasBlockOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasStdInOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasTxnOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasModeOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected boolean hasCountOption() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void printUsage(Options options) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, 10); - - Message message = new Message("dump me"); - message.setDeliveryTag(new Binary("dump".getBytes())); - - s.send(message); - - s.close(); - session.close(); - conn.close(); - - } catch (Connection.ConnectionException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java deleted file mode 100644 index 4d98655ad2..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.*; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -public class Filereceiver extends Util -{ - private static final String USAGE_STRING = "filereceiver [options]
\n\nOptions:"; - - protected Filereceiver(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - final File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - Map unsettled = new HashMap(); - final Map unsettledFileNames = new HashMap(); - - Accepted accepted = new Accepted(); - - for(String fileName : unsettledFiles) - { - File theFile = new File(tmpDirectory, fileName); - if(theFile.isFile()) - { - if(fileName.startsWith("~") && fileName.endsWith("~")) - { - theFile.delete(); - } - else - { - int splitPoint = fileName.indexOf("."); - String deliveryTagStr = fileName.substring(0,splitPoint); - String actualFileName = fileName.substring(splitPoint+1); - - byte[] bytes = new byte[deliveryTagStr.length()/2]; - - - for(int i = 0; i < bytes.length; i++) - { - char c = deliveryTagStr.charAt(2*i); - char d = deliveryTagStr.charAt(1+(2*i)); - - bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) - | (d <= '9' ? d - '0' : d - 'W')); - - } - Binary deliveryTag = new Binary(bytes); - unsettled.put(deliveryTag, accepted); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - } - - Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map remoteUnsettled = r.getRemoteUnsettled(); - - for(Map.Entry entry : unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - - File tmpFile = new File(tmpDirectory, entry.getValue()); - final File dest = new File(directory, - entry.getValue().substring(entry.getValue().indexOf(".") + 1)); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - - tmpFile.renameTo(dest); - } - } - - - int credit = 10; - - r.setCredit(UnsignedInteger.valueOf(credit), true); - - - int received = 0; - Message m = null; - do - { - m = isBlock() && received == 0 ? r.receive() : r.receive(10000); - if(m != null) - { - if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) - { - final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); - final File unsettledFile = new File(tmpDirectory, - tmpFileName); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - int splitPoint = tmpFileName.indexOf("."); - - String fileName = tmpFileName.substring(splitPoint+1); - - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - unsettledFileNames.remove(deliveryTag); - } - }); - } - else - { - received++; - List
sections = m.getPayload(); - Binary deliveryTag = m.getDeliveryTag(); - StringBuilder tagNameBuilder = new StringBuilder(); - - ByteBuffer dtbuf = deliveryTag.asByteBuffer(); - while(dtbuf.hasRemaining()) - { - tagNameBuilder.append(String.format("%02x", dtbuf.get())); - } - - - ApplicationProperties properties = null; - List data = new ArrayList(); - int totalSize = 0; - for(Section section : sections) - { - if(section instanceof ApplicationProperties) - { - properties = (ApplicationProperties) section; - } - else if(section instanceof AmqpValue) - { - AmqpValue value = (AmqpValue) section; - if(value.getValue() instanceof Binary) - { - Binary binary = (Binary) value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - else - { - // TODO exception - } - } - else if(section instanceof Data) - { - Data value = (Data) section; - Binary binary = value.getValue(); - data.add(binary); - totalSize += binary.getLength(); - - } - } - if(properties != null) - { - final String fileName = (String) properties.getValue().get("filename"); - byte[] fileData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(fileData); - int offset = 0; - for(Binary bin : data) - { - buf.put(bin.asByteBuffer()); - } - File outputFile = new File(tmpDirectory, "~"+fileName+"~"); - if(outputFile.exists()) - { - outputFile.delete(); - } - FileOutputStream fos = new FileOutputStream(outputFile); - fos.write(fileData); - fos.flush(); - fos.close(); - - final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + - fileName); - outputFile.renameTo(unsettledFile); - r.acknowledge(m, new Receiver.SettledAction() - { - public void onSettled(final Binary deliveryTag) - { - final File dest = new File(directory, fileName); - if(dest.exists()) - { - System.err.println("Duplicate detected - filename " + dest.getName()); - } - unsettledFile.renameTo(dest); - - } - }); - - } - } - } - } - while(m != null); - - - r.close(); - } - else - { - System.err.println("No such directory: " + directoryName); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (FileNotFoundException e) - { - e.printStackTrace(); //TODO. - } - catch (IOException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - public static void main(String[] args) - { - new Filereceiver(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java deleted file mode 100644 index 46e6ba537f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.*; - -public class Filesender extends Util -{ - private static final String USAGE_STRING = "filesender [options]
\n\nOptions:"; - - protected Filesender(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return false; - } - - @Override - protected boolean hasModeOption() - { - return false; - } - - @Override - protected boolean hasCountOption() - { - return false; - } - - @Override - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - - } - - @Override - protected void run() - { - final String queue = getArgs()[0]; - final String directoryName = getArgs()[1]; - - try - { - MessageDigest md5 = MessageDigest.getInstance("MD5"); - Connection conn = newConnection(); - - Session session = conn.createSession(); - - File directory = new File(directoryName); - if(directory.isDirectory() && directory.canWrite()) - { - - File tmpDirectory = new File(directoryName, ".tmp"); - if(!tmpDirectory.exists()) - { - tmpDirectory.mkdir(); - } - - String[] unsettledFiles = tmpDirectory.list(); - - - - Map unsettled = new HashMap(); - Map unsettledFileNames = new HashMap(); - for(String fileName : unsettledFiles) - { - File aFile = new File(tmpDirectory, fileName); - if(aFile.canRead() && aFile.canWrite()) - { - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - unsettled.put(deliveryTag, null); - unsettledFileNames.put(deliveryTag, fileName); - } - } - - - Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), - unsettled); - - Map remoteUnsettled = s.getRemoteUnsettled(); - - for(Map.Entry entry: unsettledFileNames.entrySet()) - { - if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) - { - (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); - } - } - - if(remoteUnsettled != null) - { - for(Map.Entry entry : remoteUnsettled.entrySet()) - { - if(entry.getValue() instanceof Accepted) - { - final String fileName = unsettledFileNames.get(entry.getKey()); - if(fileName != null) - { - - Message resumed = new Message(); - resumed.setDeliveryTag(entry.getKey()); - resumed.setDeliveryState(entry.getValue()); - resumed.setResume(Boolean.TRUE); - resumed.setSettled(Boolean.TRUE); - - - - final File unsettledFile = new File(tmpDirectory, fileName); - unsettledFile.delete(); - - s.send(resumed); - - } - - } - else if(entry.getValue() instanceof Received || entry.getValue() == null) - { - final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); - Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); - resumed.setResume(Boolean.TRUE); - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - s.send(resumed, action); - - } - } - } - - - - String[] files = directory.list(); - - for(String fileName : files) - { - final File file = new File(directory, fileName); - - if(file.canRead() && file.canWrite() && !file.isDirectory()) - { - Message message = createMessageFromFile(md5, fileName, file); - - final File unsettledFile = new File(tmpDirectory, fileName); - - Sender.OutcomeAction action = new Sender.OutcomeAction() - { - public void onOutcome(Binary deliveryTag, Outcome outcome) - { - if(outcome instanceof Accepted) - { - unsettledFile.delete(); - } - } - }; - - file.renameTo(unsettledFile); - - s.send(message, action); - } - } - - s.close(); - } - else - { - System.err.println("No such directory: " + directory); - } - session.close(); - conn.close(); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); - } catch (FileNotFoundException e) - { - e.printStackTrace(); - } catch (IOException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (NoSuchAlgorithmException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - - } - - private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException - { - FileInputStream fis = new FileInputStream(file); - byte[] data = new byte[(int) file.length()]; - - int read = fis.read(data); - - fis.close(); - - Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); - Section amqpValue = new Data(new Binary(data)); - Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); - Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); - message.setDeliveryTag(deliveryTag); - md5.reset(); - return message; - } - - public static void main(String[] args) - { - new Filesender(args).run(); - } -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java deleted file mode 100644 index 07ae54b54f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -public class ReadBytes -{ - - public static void main(String[] args) throws IOException, AmqpErrorException - { - - if(args.length == 0) - { - readBytes(System.in); - } - else - { - for(String fileName : args) - { - System.out.println("=========================== " + fileName + " ==========================="); - final FileInputStream fis = new FileInputStream(fileName); - readBytes(fis); - fis.close(); - } - } - - } - - private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException - { - byte[] bytes = new byte[4096]; - - ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance()); - - int count; - - while((count = inputStream.read(bytes))!=-1) - { - ByteBuffer buf = ByteBuffer.wrap(bytes); - buf.limit(count); - while(buf.hasRemaining()) - { - - final Object value = valueHandler.parse(buf); - System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n"); - - } - } - - } - - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java deleted file mode 100644 index 0da9dc3fb7..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.commons.cli.*; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; - -import java.util.Collections; - -public class Receive extends Util -{ - private static final String USAGE_STRING = "receive [options]
\n\nOptions:"; - private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); - private UnsignedLong _lastCorrelationId; - - public static void main(String[] args) - { - new Receive(args).run(); - } - - - public Receive(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasFilterOption() - { - return true; - } - - protected void run() - { - - try - { - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - - - Session session = conn.createSession(); - - Filter filter = null; - if(getFilter() != null) - { - String[] filterParts = getFilter().split("=",2); - if("exact-subject".equals(filterParts[0])) - { - filter = new ExactSubjectFilter(filterParts[1]); - } - else if("matching-subject".equals(filterParts[0])) - { - filter = new MatchingSubjectFilter(filterParts[1]); - } - else - { - System.err.println("Unknown filter type: " + filterParts[0]); - } - } - - Receiver r = - filter == null - ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) - : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); - Transaction txn = null; - - int credit = 0; - int receivedCount = 0; - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - - Transaction txn2 = null; - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - txn2 = session.createSessionLocalTransaction(); - } - - for(int i = 0; i < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - { - r.setCredit(UnsignedInteger.valueOf(credit), false); - } - if(!isBlock()) - r.drain(); - } - - Message m = isBlock() ? r.receive() : r.receive(1000L); - credit--; - if(m==null) - { - break; - } - - - - r.acknowledge(m.getDeliveryTag(),txn); - - receivedCount++; - - System.out.println("Received Message : " + m.getPayload()); - } - - if(useTran()) - { - txn.commit(); - } - } - else - { - // TODO - } - } - else - { - // TODO - } - r.close(); - session.close(); - conn.close(); - System.out.println("Total Messages Received: " + receivedCount); - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index ad390fd498..8b792db1f1 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -241,7 +241,7 @@ public class Receiver implements DeliveryStateHandler } if(hasMore) { - xfr = receiveFromPrefetch(0L); + xfr = receiveFromPrefetch(-1l); if(xfr== null) { // TODO - this is wrong!!!! @@ -503,6 +503,37 @@ public class Receiver implements DeliveryStateHandler _endpoint.drain(); } + /** + * Waits for the receiver to drain or a message to be available to be received. + * @return true if the receiver has been drained. + */ + public boolean drainWait() + { + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + try + { + while( _prefetchQueue.peek()==null && !_endpoint.isDrained() && !_endpoint.isDetached() ) + { + lock.wait(); + } + } + catch (InterruptedException e) + { + } + } + return _prefetchQueue.peek()==null && _endpoint.isDrained(); + } + + /** + * Clears the receiver drain so that message delivery can resume. + */ + public void clearDrain() + { + _endpoint.clearDrain(); + } + public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn) { _endpoint.setLinkCredit(credit); @@ -558,4 +589,4 @@ public class Receiver implements DeliveryStateHandler void messageArrived(Receiver receiver); } -} \ No newline at end of file +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java deleted file mode 100644 index 6e1d15376c..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.Arrays; - -public class Request extends Util -{ - private static final String USAGE_STRING = "request [options]
[ ...]\n\nOptions:"; - - public static void main(String[] args) - { - new Request(args).run(); - } - - public Request(String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public void run() - { - - try - { - - - final String queue = getArgs()[0]; - - String message = ""; - - Connection conn = newConnection(); - Session session = conn.createSession(); - - Connection conn2; - Session session2; - Receiver responseReceiver; - - if(isUseMultipleConnections()) - { - conn2 = newConnection(); - session2 = conn2.createSession(); - responseReceiver = session2.createTemporaryQueueReceiver(); - } - else - { - conn2 = null; - session2 = null; - responseReceiver = session.createTemporaryQueueReceiver(); - } - - - - - responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - - - Sender s = session.createSender(queue, getWindowSize(), getMode()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - int received = 0; - - if(getArgs().length >= 2) - { - message = getArgs()[1]; - if(message.length() < getMessageSize()) - { - StringBuilder builder = new StringBuilder(getMessageSize()); - builder.append(message); - for(int x = message.length(); x < getMessageSize(); x++) - { - builder.append('.'); - } - message = builder.toString(); - } - - for(int i = 0; i < getCount(); i++) - { - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - properties.setReplyTo(responseReceiver.getAddress()); - - AmqpValue amqpValue = new AmqpValue(message); - Section[] sections = { new Header() , properties, amqpValue}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - - Message responseMessage = responseReceiver.receive(false); - if(responseMessage != null) - { - responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn); - received++; - } - } - } - - if(txn != null) - { - txn.commit(); - } - - - while(received < getCount()) - { - Message responseMessage = responseReceiver.receive(); - responseReceiver.acknowledge(responseMessage.getDeliveryTag()); - received++; - } - - - - - s.close(); - session.close(); - conn.close(); - - if(session2 != null) - { - session2.close(); - conn2.close(); - } - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //TODO. - } - - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java deleted file mode 100644 index 8d9de4893f..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -import java.util.*; - -public class Respond extends Util -{ - private static final String USAGE_STRING = "respond [options]
\n\nOptions:"; - private Connection _conn; - private Session _session; - private Receiver _receiver; - private Transaction _txn; - private Map _senders; - private UnsignedLong _responseMsgId = UnsignedLong.ZERO; - private Connection _conn2; - private Session _session2; - - public Respond(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return false; - } - - @Override - protected boolean hasLinkNameOption() - { - return false; - } - - @Override - protected boolean hasResponseQueueOption() - { - return true; - } - - @Override - protected boolean hasSizeOption() - { - return false; - } - - @Override - protected boolean hasBlockOption() - { - return true; - } - - @Override - protected boolean hasStdInOption() - { - return false; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasSingleLinkPerConnectionMode() - { - return true; - } - - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - public static void main(String[] args) - { - new Respond(args).run(); - } - - public void run() - { - try - { - - _senders = new HashMap(); - - final String queue = getArgs()[0]; - - String message = ""; - - _conn = newConnection(); - - - - if(isUseMultipleConnections()) - { - _conn2 = newConnection(); - _session2 = _conn2.createSession(); - } - - - _session = _conn.createSession(); - - - _receiver = _session.createReceiver(queue, getMode()); - _txn = null; - - int credit = 0; - int receivedCount = 0; - _responseMsgId = UnsignedLong.ZERO; - - Random random = null; - int batch = 0; - List txnMessages = null; - if(useTran()) - { - if(getRollbackRatio() != 0) - { - random = new Random(); - } - batch = getBatchSize(); - _txn = _session.createSessionLocalTransaction(); - txnMessages = new ArrayList(batch); - } - - - for(int i = 0; receivedCount < getCount(); i++) - { - - if(credit == 0) - { - if(getCount() - i <= getWindowSize()) - { - credit = getCount() - i; - - } - else - { - credit = getWindowSize(); - - } - - _receiver.setCredit(UnsignedInteger.valueOf(credit), false); - - if(!isBlock()) - _receiver.drain(); - } - - Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); - credit--; - if(m==null) - { - if(useTran() && batch != getBatchSize()) - { - _txn.commit(); - } - break; - } - - System.out.println("Received Message: " + m.getPayload()); - - respond(m); - - - - if(useTran()) - { - - txnMessages.add(m); - - if(--batch == 0) - { - - if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) - { - _txn.commit(); - txnMessages.clear(); - receivedCount += getBatchSize(); - } - else - { - System.out.println("Random Rollback"); - _txn.rollback(); - double result; - do - { - _txn = _session.createSessionLocalTransaction(); - - for(Message msg : txnMessages) - { - respond(msg); - } - - result = random.nextDouble(); - if(result sections = m.getPayload(); - String replyTo = null; - Object correlationId = null; - for(Section section : sections) - { - if(section instanceof Properties) - { - replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); - correlationId = ((Properties) section).getMessageId(); - break; - } - } - - if(replyTo != null) - { - Sender s = _senders.get(replyTo); - if(s == null) - { - s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); - _senders.put(replyTo, s); - } - - List
replySections = new ArrayList
(sections); - - ListIterator
sectionIterator = replySections.listIterator(); - - while(sectionIterator.hasNext()) - { - Section section = sectionIterator.next(); - if(section instanceof Properties) - { - Properties newProps = new Properties(); - newProps.setTo(replyTo); - newProps.setCorrelationId(correlationId); - newProps.setMessageId(_responseMsgId); - _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); - sectionIterator.set(newProps); - } - } - - Message replyMessage = new Message(replySections); - System.out.println("Sent Message: " + replySections); - s.send(replyMessage, _txn); - - } - _receiver.acknowledge(m.getDeliveryTag(), _txn); - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java deleted file mode 100644 index 6f6575e083..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.amqp_1_0.client; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.LineNumberReader; -import java.util.Arrays; - -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.commons.cli.*; - -public class Send extends Util -{ - private static final String USAGE_STRING = "send [options]
[ ...]\n\nOptions:"; - private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'}; - - - public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException - { - new Send(args).run(); - } - - - public Send(final String[] args) - { - super(args); - } - - @Override - protected boolean hasLinkDurableOption() - { - return true; - } - - @Override - protected boolean hasLinkNameOption() - { - return true; - } - - @Override - protected boolean hasResponseQueueOption() - { - return false; - } - - @Override - protected boolean hasSizeOption() - { - return true; - } - - @Override - protected boolean hasBlockOption() - { - return false; - } - - @Override - protected boolean hasStdInOption() - { - return true; - } - - @Override - protected boolean hasTxnOption() - { - return true; - } - - @Override - protected boolean hasModeOption() - { - return true; - } - - @Override - protected boolean hasCountOption() - { - return true; - } - - @Override - protected boolean hasWindowSizeOption() - { - return true; - } - - @Override - protected boolean hasSubjectOption() - { - return true; - } - - public void run() - { - - final String queue = getArgs()[0]; - - String message = ""; - - try - { - Connection conn = newConnection(); - - Session session = conn.createSession(); - - - Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName()); - - Transaction txn = null; - - if(useTran()) - { - txn = session.createSessionLocalTransaction(); - } - - if(!useStdIn()) - { - if(getArgs().length <= 2) - { - if(getArgs().length == 2) - { - message = getArgs()[1]; - } - for(int i = 0; i < getCount(); i++) - { - - Properties properties = new Properties(); - properties.setMessageId(UnsignedLong.valueOf(i)); - if(getSubject() != null) - { - properties.setSubject(getSubject()); - } - Section bodySection; - byte[] bytes = (message + " " + i).getBytes(); - if(bytes.length < getMessageSize()) - { - byte[] origBytes = bytes; - bytes = new byte[getMessageSize()]; - System.arraycopy(origBytes,0,bytes,0,origBytes.length); - for(int x = origBytes.length; x < bytes.length; x++) - { - bytes[x] = (byte) '.'; - } - bodySection = new Data(new Binary(bytes)); - } - else - { - bodySection = new AmqpValue(message + " " + i); - } - - Section[] sections = {properties, bodySection}; - final Message message1 = new Message(Arrays.asList(sections)); - - s.send(message1, txn); - } - } - else - { - for(int i = 1; i < getArgs().length; i++) - { - s.send(new Message(getArgs()[i]), txn); - } - - } - } - else - { - LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in)); - - - try - { - while((message = buf.readLine()) != null) - { - s.send(new Message(message), txn); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - } - - if(txn != null) - { - txn.commit(); - } - - s.close(); - - session.close(); - conn.close(); - } - catch (Sender.SenderClosingException e) - { - e.printStackTrace(); //TODO. - } - catch (Connection.ConnectionException e) - { - e.printStackTrace(); //TODO. - } - catch (Sender.SenderCreationException e) - { - e.printStackTrace(); //TODO. - } - - - } - - protected void printUsage(Options options) - { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(USAGE_STRING, options ); - } - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java deleted file mode 100644 index 6f97ecd810..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.codec.ValueWriter; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.UnsignedShort; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.Footer; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.amqp_1_0.type.transport.Flow; - -import org.apache.qpid.amqp_1_0.type.transport.Transfer; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; - -public class SendBytes -{ - - public static void main(String[] args) throws - Sender.SenderCreationException, - Sender.SenderClosingException, - Connection.ConnectionException, - IOException, ParseException - { - Transfer xfr = new Transfer(); - Flow fs = new Flow(); - fs.setIncomingWindow(UnsignedInteger.valueOf(1024)); - fs.setDeliveryCount(UnsignedInteger.valueOf(2)); - fs.setLinkCredit(UnsignedInteger.valueOf(18)); - fs.setAvailable(UnsignedInteger.valueOf(0)); - fs.setDrain(false); - - xfr.setHandle(UnsignedInteger.valueOf(0)); - xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes())); - //xfr.setDeliveryTag(new Binary(new byte[] {0})); - xfr.setDeliveryId(UnsignedInteger.valueOf(0)); - xfr.setSettled(true); - - - Header h = new Header(); - Properties p = new Properties(); - p.setTo("queue"); - //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes())); - - Footer f = new Footer(Collections.EMPTY_MAP); - - Section[] sections = new Section[] { h,p,f}; - //Section[] sections = new Section[] { b }; - //Section[] sections = { h,p, b}; -/* - Fragment[] fragments = new Fragment[5]; - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer(); - - SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry); - - int num = 0; - int i = 0; - for(Section s : sections) - { - Fragment frag = new Fragment(); - - frag.setPayload(s.encode(encoder)); - frag.setFirst(true); - frag.setLast(true); - frag.setSectionCode(s.getSectionCode()); - frag.setSectionNumber(UnsignedInteger.valueOf(num++)); - frag.setSectionOffset(UnsignedLong.valueOf(0L)); - fragments[i++] =frag; - } - - xfr.setFragments(fragments); -*/ - - encodeTypes("xfr",xfr); - - final byte[] result; - final Object input = xfr; -/* - result = encode(1024, input); - - boolean ok = true; - - for(int j = 10; ok && j < 400; j++) - { - - byte[] result2 = encode(j,input); - - for(int i = 0; i <400; i++) - { - if(result[i] != result2[i]) - { - System.out.println("result differs at " + i + " Splitting at " + j+ " [" + result[i] + " - " + result2[i] + "]"); - //break; - //ok = false; - - } - } - }*/ - //System.out.println(Arrays.equals(result, result2)); - - //doEncodes(); - /*OutputStream out = System.out; - if(args.length > 0) - { - out = new FileOutputStream(args[0]); - } - - Transfer xfr = new Transfer(); - fs.setSessionCredit(UnsignedInteger.valueOf(1024)); - fs.setTransferCount(UnsignedInteger.valueOf(2)); - fs.setLinkCredit(UnsignedInteger.valueOf(18)); - fs.setAvailable(UnsignedInteger.valueOf(0)); - fs.setDrain(false); - - xfr.setHandle(UnsignedInteger.valueOf(0)); - //xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes())); - xfr.setDeliveryTag(new Binary(new byte[] {0})); - xfr.setTransferId(UnsignedInteger.valueOf(0)); - xfr.setSettled(true); - xfr.setFlowState(fs); - - Header h = new Header(); - h.setTransmitTime(new Date(System.currentTimeMillis())); - Properties p = new Properties(); - p.setTo(new Address("queue")); - //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes())); - AmqpMapSection m = new AmqpMapSection(); - DataSection b = new DataSection("Hello World!".getBytes()); - - Footer f = new Footer(); - - Section[] sections = new Section[] { h,p,m,b,f}; - //Section[] sections = new Section[] { b }; - //Section[] sections = { h,p, b}; - List fragments = new ArrayList(5); - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry); - - for(Section s : sections) - { - Fragment frag = new Fragment(); - - frag.setPayload(s.encode(encoder)); - frag.setFirst(true); - frag.setLast(true); - frag.setFormatCode(s.getSectionCode()); - frag.setFragmentOffset(null); - fragments.add(frag); - } - - xfr.setFragments(fragments); - - - Object[] objectsToWrite = new Object[] { xfr }; - ByteBuffer buf = ByteBuffer.allocate(4096); - - - for(Object obj : objectsToWrite) - { - ValueWriter writer = typeRegistry.getValueWriter(obj); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - - out.flush(); - out.close();*/ - - } - - public static void doEncodes() throws IOException, ParseException - { - encodeTypes("boolean", Boolean.TRUE, Boolean.FALSE); - encodeTypes("ubyte", UnsignedByte.valueOf((byte)0), UnsignedByte.valueOf((byte)1 ),UnsignedByte.valueOf((byte)3), UnsignedByte.valueOf((byte)42), UnsignedByte.valueOf("255")); - encodeTypes("byte", Byte.valueOf((byte)0), Byte.valueOf( (byte)1), Byte.valueOf((byte) 3), Byte.valueOf((byte) 42), Byte.valueOf((byte) 127), Byte.valueOf((byte) -1), Byte.valueOf((byte) -3), Byte.valueOf((byte) -42), Byte.valueOf( (byte)-128)); - encodeTypes("ushort", UnsignedShort.valueOf((short)0), UnsignedShort.valueOf((short)1), UnsignedShort.valueOf((short)3), UnsignedShort.valueOf((short)42), UnsignedShort.valueOf("65535")); - encodeTypes("short", Short.valueOf((short)0), Short.valueOf((short)1), Short.valueOf((short)3), Short.valueOf((short)42), Short.valueOf((short)32767), Short.valueOf((short)-1), Short.valueOf((short)-3), Short.valueOf((short)-42), Short.valueOf((short)-32768)); - encodeTypes("uint",UnsignedInteger.valueOf(0), UnsignedInteger.valueOf(1), UnsignedInteger.valueOf(3), UnsignedInteger.valueOf(42), UnsignedInteger.valueOf("4294967295")); - encodeTypes("int", 0, 1, 3, 42, 2147483647, -1, -3, -42, -2147483648); - encodeTypes("ulong", UnsignedLong.valueOf(0), UnsignedLong.valueOf(1), UnsignedLong.valueOf(3), UnsignedLong.valueOf(42), UnsignedLong.valueOf("18446744073709551615")); - encodeTypes("long", 0l, 1l, 3l, 42l, 9223372036854775807l, -1l, -3l, -42l, -9223372036854775808l); - encodeTypes("float", 3.14159); - encodeTypes("double", Double.valueOf(3.14159265359)); - encodeTypes("char", '?'); - - SimpleDateFormat df = new SimpleDateFormat("HHa z MMM d yyyy"); - - encodeTypes("timestamp", df.parse("9AM PST Dec 6 2010"), df.parse("9AM PST Dec 6 1910")); - encodeTypes("uuid", UUID.fromString("f275ea5e-0c57-4ad7-b11a-b20c563d3b71")); - encodeTypes("binary", new Binary( new byte[] {(byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF}), new Binary(new byte[] { (byte)0xCA,(byte)0xFE, (byte)0xBA, (byte)0xBE})); - encodeTypes("string", "The quick brown fox jumped over the lazy cow."); - encodeTypes("symbol", Symbol.valueOf("connectathon")); - encodeTypes("list", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE})); - Map map = new HashMap(); - map.put("one", Long.valueOf(1)); - map.put("two", Long.valueOf(2)); - map.put("pi", Double.valueOf(3.14159265359)); - map.put("list:", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE})); - map.put(null, Boolean.TRUE); - encodeTypes("map", map); - encodeTypes("null", null); - - } - - static void encodeTypes(String name, Object... vals ) throws IOException - { - FileOutputStream out = new FileOutputStream("/home/rob/"+name+".out"); - ByteBuffer buf = ByteBuffer.allocate(4096); - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - if(vals != null) - { - for(Object obj : vals) - { - ValueWriter writer = typeRegistry.getValueWriter(obj); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - } - else - { - ValueWriter writer = typeRegistry.getValueWriter(null); - - int count; - - - do - { - count = writer.writeToBuffer(buf); - out.write(buf.array(), buf.arrayOffset(), count); - buf.clear(); - } while (!writer.isComplete()); - - } - out.flush(); - out.close(); - - } - - static byte[] encode(int size, Object... vals) - { - byte[] result = new byte[10000]; - int pos = 0; - - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - AMQFrame frame = AMQFrame.createAMQFrame((short) 0, (FrameBody) vals[0]); - FrameWriter writer = new FrameWriter(typeRegistry); - /*for(Object obj : vals) - { - final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - ValueWriter writer = typeRegistry.getValueWriter(obj); -*/ - int count; - - ByteBuffer buf = ByteBuffer.wrap(result, pos, size); - - do - { - - writer.writeToBuffer(buf); - pos = buf.position(); - buf = ByteBuffer.wrap(result, pos, size); - if(!writer.isComplete()) - { - count = 3; - } - - } while (!writer.isComplete()); -/* - - } -*/ - - return result; - - } - - -} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java deleted file mode 100644 index 6fe2a6d510..0000000000 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java +++ /dev/null @@ -1,529 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.amqp_1_0.client; - -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.commons.cli.*; - -import java.util.logging.*; - -public abstract class Util -{ - - private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); - private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); - private String _host; - private String _username; - private String _password; - private int _port; - private int _count; - private boolean _useStdIn; - private boolean _useTran; - private String[] _args; - private AcknowledgeMode _mode; - private boolean _block; - private int _frameSize; - private int _messageSize; - private String _responseQueue; - private int _batchSize; - private double _rollbackRatio; - private String _linkName; - private String _containerName; - private boolean _durableLink; - private boolean _useMultipleConnections; - private int _windowSize = 100; - private String _subject; - private String _filter; - private String _remoteHost; - private boolean _useSSL; - - protected Util(String[] args) - { - CommandLineParser cmdLineParse = new PosixParser(); - - Options options = new Options(); - options.addOption("h","help",false,"show this help message and exit"); - options.addOption(OptionBuilder.withLongOpt("host") - .withDescription( "host to connect to (default 0.0.0.0)" ) - .hasArg(true) - .withArgName("HOST") - .create('H')); - options.addOption(OptionBuilder.withLongOpt("username") - .withDescription( "username to use for authentication" ) - .hasArg(true) - .withArgName("USERNAME") - .create('u')); - options.addOption(OptionBuilder.withLongOpt("password") - .withDescription( "password to use for authentication" ) - .hasArg(true) - .withArgName("PASSWORD") - .create('w')); - options.addOption(OptionBuilder.withLongOpt("port") - .withDescription( "port to connect to (default 5672)" ) - .hasArg(true) - .withArgName("PORT") - .create('p')); - options.addOption(OptionBuilder.withLongOpt("frame-size") - .withDescription( "specify the maximum frame size" ) - .hasArg(true) - .withArgName("FRAME_SIZE") - .create('f')); - options.addOption(OptionBuilder.withLongOpt("container-name") - .withDescription( "Container name" ) - .hasArg(true) - .withArgName("CONTAINER_NAME") - .create('C')); - - options.addOption(OptionBuilder.withLongOpt("ssl") - .withDescription("Use SSL") - .create('S')); - - options.addOption(OptionBuilder.withLongOpt("remote-hostname") - .withDescription( "hostname to supply in the open frame" ) - .hasArg(true) - .withArgName("HOST") - .create('O')); - - if(hasBlockOption()) - options.addOption(OptionBuilder.withLongOpt("block") - .withDescription("block until messages arrive") - .create('b')); - - if(hasCountOption()) - options.addOption(OptionBuilder.withLongOpt("count") - .withDescription( "number of messages to send (default 1)" ) - .hasArg(true) - .withArgName("COUNT") - .create('c')); - if(hasModeOption()) - options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") - .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) - .hasArg(true) - .withArgName("MODE") - .create('k')); - - if(hasSubjectOption()) - options.addOption(OptionBuilder.withLongOpt("subject") - .withDescription( "subject message property" ) - .hasArg(true) - .withArgName("SUBJECT") - .create('s')); - - - if(hasSingleLinkPerConnectionMode()) - options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") - .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") - .hasArg(false) - .create('Z')); - - if(hasFilterOption()) - options.addOption(OptionBuilder.withLongOpt("filter") - .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") - .hasArg(true) - .withArgName("=") - .create('F')); - - - if(hasTxnOption()) - { - options.addOption("x","txn",false,"use transactions"); - options.addOption(OptionBuilder.withLongOpt("batch-size") - .withDescription( "transaction batch size (default: 1)" ) - .hasArg(true) - .withArgName("BATCH-SIZE") - .create('B')); - options.addOption(OptionBuilder.withLongOpt("rollback-ratio") - .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) - .hasArg(true) - .withArgName("RATIO") - .create('R')); - } - - if(hasLinkDurableOption()) - { - options.addOption("d","durable-link",false,"use a durable link"); - } - - if(hasStdInOption()) - options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); - - options.addOption(OptionBuilder.withLongOpt("trace") - .withDescription("trace logging specified categories: RAW, FRM") - .hasArg(true) - .withArgName("TRACE") - .create('t')); - if(hasSizeOption()) - options.addOption(OptionBuilder.withLongOpt("message-size") - .withDescription( "size to pad outgoing messages to" ) - .hasArg(true) - .withArgName("SIZE") - .create('z')); - - if(hasResponseQueueOption()) - options.addOption(OptionBuilder.withLongOpt("response-queue") - .withDescription( "response queue to reply to" ) - .hasArg(true) - .withArgName("RESPONSE_QUEUE") - .create('r')); - - if(hasLinkNameOption()) - { - options.addOption(OptionBuilder.withLongOpt("link") - .withDescription( "link name" ) - .hasArg(true) - .withArgName("LINK") - .create('l')); - } - - if(hasWindowSizeOption()) - { - options.addOption(OptionBuilder.withLongOpt("window-size") - .withDescription("credit window size") - .hasArg(true) - .withArgName("WINDOW-SIZE") - .create('W')); - } - - CommandLine cmdLine = null; - try - { - cmdLine = cmdLineParse.parse(options, args); - - } - catch (ParseException e) - { - printUsage(options); - System.exit(-1); - } - - if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) - { - printUsage(options); - System.exit(0); - } - _host = cmdLine.getOptionValue('H',"0.0.0.0"); - _remoteHost = cmdLine.getOptionValue('O',null); - String portStr = cmdLine.getOptionValue('p',"5672"); - String countStr = cmdLine.getOptionValue('c',"1"); - - _useSSL = cmdLine.hasOption('S'); - - if(hasWindowSizeOption()) - { - String windowSizeStr = cmdLine.getOptionValue('W',"100"); - _windowSize = Integer.parseInt(windowSizeStr); - } - - if(hasSubjectOption()) - { - _subject = cmdLine.getOptionValue('s'); - } - - if(cmdLine.hasOption('u')) - { - _username = cmdLine.getOptionValue('u'); - } - - if(cmdLine.hasOption('w')) - { - _password = cmdLine.getOptionValue('w'); - } - - if(cmdLine.hasOption('F')) - { - _filter = cmdLine.getOptionValue('F'); - } - - _port = Integer.parseInt(portStr); - - _containerName = cmdLine.getOptionValue('C'); - - if(hasBlockOption()) - _block = cmdLine.hasOption('b'); - - if(hasLinkNameOption()) - _linkName = cmdLine.getOptionValue('l'); - - - if(hasLinkDurableOption()) - _durableLink = cmdLine.hasOption('d'); - - if(hasCountOption()) - _count = Integer.parseInt(countStr); - - if(hasStdInOption()) - _useStdIn = cmdLine.hasOption('i'); - - if(hasSingleLinkPerConnectionMode()) - _useMultipleConnections = cmdLine.hasOption('Z'); - - if(hasTxnOption()) - { - _useTran = cmdLine.hasOption('x'); - _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); - _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); - } - - if(hasModeOption()) - { - _mode = AcknowledgeMode.ALO; - - if(cmdLine.hasOption('k')) - { - _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); - } - } - - if(hasResponseQueueOption()) - { - _responseQueue = cmdLine.getOptionValue('r'); - } - - _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); - - if(hasSizeOption()) - { - _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); - } - - String categoriesList = cmdLine.getOptionValue('t'); - String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); - for(String cat : categories) - { - if(cat.equalsIgnoreCase("FRM")) - { - FRAME_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : FRAME_LOGGER.getHandlers()) - { - FRAME_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - FRAME_LOGGER.addHandler(handler); - } - else if (cat.equalsIgnoreCase("RAW")) - { - RAW_LOGGER.setLevel(Level.FINE); - Formatter formatter = new Formatter() - { - @Override - public String format(final LogRecord record) - { - return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; - } - }; - for(Handler handler : RAW_LOGGER.getHandlers()) - { - RAW_LOGGER.removeHandler(handler); - } - Handler handler = new ConsoleHandler(); - handler.setLevel(Level.FINE); - handler.setFormatter(formatter); - RAW_LOGGER.addHandler(handler); - } - } - - - _args = cmdLine.getArgs(); - - } - - protected boolean hasFilterOption() - { - return false; - } - - protected boolean hasSubjectOption() - { - return false; - } - - protected boolean hasWindowSizeOption() - { - return false; - } - - protected boolean hasSingleLinkPerConnectionMode() - { - return false; - } - - protected abstract boolean hasLinkDurableOption(); - - protected abstract boolean hasLinkNameOption(); - - protected abstract boolean hasResponseQueueOption(); - - protected abstract boolean hasSizeOption(); - - protected abstract boolean hasBlockOption(); - - protected abstract boolean hasStdInOption(); - - protected abstract boolean hasTxnOption(); - - protected abstract boolean hasModeOption(); - - protected abstract boolean hasCountOption(); - - public String getHost() - { - return _host; - } - - public String getUsername() - { - return _username; - } - - public String getPassword() - { - return _password; - } - - public int getPort() - { - return _port; - } - - public int getCount() - { - return _count; - } - - public boolean useStdIn() - { - return _useStdIn; - } - - public boolean useTran() - { - return _useTran; - } - - public AcknowledgeMode getMode() - { - return _mode; - } - - public boolean isBlock() - { - return _block; - } - - public String[] getArgs() - { - return _args; - } - - public int getMessageSize() - { - return _messageSize; - } - - public String getResponseQueue() - { - return _responseQueue; - } - - public int getBatchSize() - { - return _batchSize; - } - - public double getRollbackRatio() - { - return _rollbackRatio; - } - - public String getLinkName() - { - return _linkName; - } - - public boolean isDurableLink() - { - return _durableLink; - } - - public boolean isUseMultipleConnections() - { - return _useMultipleConnections; - } - - public void setUseMultipleConnections(boolean useMultipleConnections) - { - _useMultipleConnections = useMultipleConnections; - } - - public String getSubject() - { - return _subject; - } - - public void setSubject(String subject) - { - _subject = subject; - } - - protected abstract void printUsage(final Options options); - - protected abstract void run(); - - - public Connection newConnection() throws Connection.ConnectionException - { - Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); - return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, - _remoteHost == null ? getHost() : _remoteHost, _useSSL) - : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, - container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); - } - - public String getContainerName() - { - return _containerName; - } - - public int getWindowSize() - { - return _windowSize; - } - - public void setWindowSize(int windowSize) - { - _windowSize = windowSize; - } - - public String getFilter() - { - return _filter; - } -} -- cgit v1.2.1