diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-04-04 21:10:07 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-04-04 21:10:07 +0000 |
| commit | f93a96fc0cccd29885383f2781f01484f7833eeb (patch) | |
| tree | 2f4a81835f29bc9a61f9d4947eed8fc0654f3fe7 /java/amqp-1-0-client | |
| parent | 6d83a5faddfeff7c3a788907d886dae7459dbaa1 (diff) | |
| download | qpid-python-f93a96fc0cccd29885383f2781f01484f7833eeb.tar.gz | |
QPID-3933 : [Java] Add interim AMQP 1-0 implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1309594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/amqp-1-0-client')
21 files changed, 5415 insertions, 0 deletions
diff --git a/java/amqp-1-0-client/build.xml b/java/amqp-1-0-client/build.xml new file mode 100644 index 0000000000..173d7540d4 --- /dev/null +++ b/java/amqp-1-0-client/build.xml @@ -0,0 +1,29 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQP 1.0 Client" default="build"> + + <property name="module.genpom" value="true"/> + <property name="module.depends" value="amqp-1-0-common"/> + + + <import file="../module.xml"/> + +</project> diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java new file mode 100644 index 0000000000..05d176bc35 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/AcknowledgeMode.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +public enum AcknowledgeMode +{ + AMO, + ALO, + EO +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java new file mode 100644 index 0000000000..3bb26744c4 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Command.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import java.lang.reflect.InvocationTargetException; + +public class Command +{ + public static void main(String[] args) throws + ClassNotFoundException, + NoSuchMethodException, + InvocationTargetException, + IllegalAccessException, + InstantiationException + { + String name = args[0]; + String[] cmdArgs = new String[args.length-1]; + System.arraycopy(args,1,cmdArgs,0,args.length-1); + name = "org.apache.qpid.amqp_1_0.client." + String.valueOf(name.charAt(0)).toUpperCase() + name.substring(1).toLowerCase(); + Class<Util> clazz = (Class<Util>) Class.forName(name); + Util util = clazz.getDeclaredConstructor(String[].class).newInstance((Object)cmdArgs); + util.run(); + + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java new file mode 100644 index 0000000000..e3d56fae09 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -0,0 +1,481 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
+import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class Connection
+{
+ private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
+ private static final int MAX_FRAME_SIZE = 65536;
+
+ private String _address;
+ private ConnectionEndpoint _conn;
+ private int _sessionCount;
+
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password, String remoteHostname) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHostname);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,new Container());
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,MAX_FRAME_SIZE,container);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container, null);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname) throws ConnectionException
+ {
+ this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final boolean ssl) throws ConnectionException
+ {
+ this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+ }
+
+ public Connection(final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname, boolean ssl) throws ConnectionException
+ {
+
+ _address = address;
+
+ try
+ {
+ final Socket s;
+ if(ssl)
+ {
+ s = SSLSocketFactory.getDefault().createSocket(address, port);
+ }
+ else
+ {
+ s = new Socket(address, port);
+ }
+
+
+ Principal principal = username == null ? null : new Principal()
+ {
+
+ public String getName()
+ {
+ return username;
+ }
+ };
+ _conn = new ConnectionEndpoint(container, principal, password);
+ _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ _conn.setRemoteAddress(s.getRemoteSocketAddress());
+ _conn.setRemoteHostname(remoteHostname);
+
+
+
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
+
+
+ final OutputStream outputStream = s.getOutputStream();
+
+ ConnectionHandler.BytesSource src;
+
+ if(_conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
+
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+
+ _conn.setSaslFrameOutput(saslOut);
+ }
+ else
+ {
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+ }
+
+
+ //ConnectionHandler.OutputHandler outputHandler = new ConnectionHandler.OutputHandler(outputStream, out, _conn.getDescribedTypeRegistry());
+ ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn);
+ Thread outputThread = new Thread(outputHandler);
+ outputThread.setDaemon(true);
+ outputThread.start();
+ _conn.setFrameOutputHandler(out);
+
+
+
+ final ConnectionHandler handler = new ConnectionHandler(_conn);
+ final InputStream inputStream = s.getInputStream();
+
+ //final AMQPTransport transport = new AMQPTransport(new AMQPFrameTransport(_conn));
+
+ Thread inputThread = new Thread(new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ doRead(handler, inputStream);
+// doRead(transport, inputStream);
+ }
+ finally
+ {
+ if(_conn.closedForInput() && _conn.closedForOutput())
+ {
+ try
+ {
+ s.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ });
+
+ inputThread.setDaemon(true);
+ inputThread.start();
+
+/*
+ Thread outputThread = new Thread(new Runnable()
+ {
+
+ private int _lastWrite;
+
+ public void run()
+ {
+ try
+ {
+// doRead(handler, inputStream);
+ final Object lock = new Object();
+ transport.setOutputStateChangeListener(new StateChangeListener()
+ {
+
+ public void onStateChange(final boolean active)
+ {
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ });
+
+ synchronized(lock)
+ {
+ while(transport.isOpenForOutput())
+ {
+ _lastWrite = 0;
+ transport.getNextBytes(new BytesProcessor()
+ {
+
+ public void processBytes(final ByteBuffer buf)
+ {
+ _lastWrite = buf.remaining();
+ try
+ {
+ outputStream.write(buf.array(),
+ buf.arrayOffset() + buf.position(),
+ buf.limit() - buf.position());
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ });
+ if(_lastWrite == 0 && transport.isOpenForOutput())
+ {
+ try
+ {
+ lock.wait(1000);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(_conn.closedForInput() && _conn.closedForOutput())
+ {
+ try
+ {
+ s.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
+ });
+*/
+
+ _conn.open();
+
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionException(e);
+ }
+
+
+ }
+
+
+
+ private void doRead(final AMQPTransport transport, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+ ByteBuffer bbuf = ByteBuffer.wrap(buf);
+ final Object lock = new Object();
+ transport.setInputStateChangeListener(new StateChangeListener(){
+
+ public void onStateChange(final boolean active)
+ {
+ synchronized(lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ });
+
+ try
+ {
+ int read;
+ while((read = inputStream.read(buf)) != -1)
+ {
+ bbuf.position(0);
+ bbuf.limit(read);
+
+ while(bbuf.hasRemaining() && transport.isOpenForInput())
+ {
+ transport.processBytes(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ }
+
+ public Session createSession()
+ {
+ Session session = new Session(this,String.valueOf(_sessionCount++));
+ return session;
+ }
+
+ public ConnectionEndpoint getEndpoint()
+ {
+ return _conn;
+ }
+
+ private void doRead(final ConnectionHandler handler, final InputStream inputStream)
+ {
+ byte[] buf = new byte[2<<15];
+
+
+ try
+ {
+ int read;
+ boolean done = false;
+ while(!done && (read = inputStream.read(buf)) != -1)
+ {
+ ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
+ Binary b = new Binary(buf,0,read);
+
+ if(RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
+ }
+ /*System.err.println(b);
+ System.err.println("XXX: " + bbuf.hasRemaining() + "; " + handler.isDone());
+ if(handler.isDone())
+ {
+ System.err.println(handler.getClass().getName() + "IS DONE!");
+ } */
+ while(bbuf.hasRemaining() && !handler.isDone())
+ {
+ handler.parse(bbuf);
+ }
+
+
+ }
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ public void close()
+ {
+ _conn.close();
+
+ synchronized (_conn.getLock())
+ {
+ while(!_conn.closedForInput())
+ {
+ try
+ {
+ _conn.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+ }
+ }
+ }
+
+
+ public static class ConnectionException extends Exception
+ {
+ public ConnectionException(Throwable cause)
+ {
+ super(cause);
+ }
+ }
+
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java new file mode 100644 index 0000000000..b58ce6bfe5 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -0,0 +1,407 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Demo extends Util
+{
+ private static final String USAGE_STRING = "demo [options] <vendor> [<content> ...]\n\nOptions:";
+ private static final String OPCODE = "opcode";
+ private static final String ACTION = "action";
+ private static final String MESSAGE_ID = "message-id";
+ private static final String VENDOR = "vendor";
+ private static final String LOG = "log";
+ private static final String RECEIVED = "received";
+ private static final String TEST = "test";
+ private static final String APACHE = "apache";
+ private static final String SENT = "sent";
+ private static final String LINK_REF = "link-ref";
+ private static final String HOST = "host";
+ private static final String PORT = "port";
+ private static final String SASL_USER = "sasl-user";
+ private static final String SASL_PASSWORD = "sasl-password";
+ private static final String ROLE = "role";
+ private static final String ADDRESS = "address";
+ private static final String SENDER = "sender";
+ private static final String SEND_MESSAGE = "send-message";
+ private static final String ANNOUNCE = "announce";
+ private static final String MESSAGE_VENDOR = "message-vendor";
+ private static final String CREATE_LINK = "create-link";
+
+ public static void main(String[] args)
+ {
+ new Demo(args).run();
+ }
+
+ public Demo(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return false;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+ final String vendor = getArgs()[0];
+ final String queue = "control";
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+
+ Receiver responseReceiver;
+
+ responseReceiver = session.createTemporaryQueueReceiver();
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+
+ Properties properties = new Properties();
+ properties.setMessageId(java.util.UUID.randomUUID());
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ HashMap appPropMap = new HashMap();
+ ApplicationProperties appProperties = new ApplicationProperties(appPropMap);
+
+ appPropMap.put(OPCODE, ANNOUNCE);
+ appPropMap.put(VENDOR, vendor);
+ appPropMap.put(ADDRESS,responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { properties, appProperties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1);
+
+ Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
+ Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
+
+
+ boolean done = false;
+
+ while(!done)
+ {
+ boolean wait = true;
+ Message m = responseReceiver.receive(false);
+ if(m != null)
+ {
+ List<Section> payload = m.getPayload();
+ wait = false;
+ ApplicationProperties props = m.getApplicationProperties();
+ Map map = props.getValue();
+ String op = (String) map.get(OPCODE);
+ if("reset".equals(op))
+ {
+ for(Sender sender : sendingLinks.values())
+ {
+ try
+ {
+ sender.close();
+ Session session1 = sender.getSession();
+ session1.close();
+ session1.getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ for(Receiver receiver : receivingLinks.values())
+ {
+ try
+ {
+ receiver.close();
+ receiver.getSession().close();
+ receiver.getSession().getConnection().close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ sendingLinks.clear();
+ receivingLinks.clear();
+ }
+ else if(CREATE_LINK.equals(op))
+ {
+ Object linkRef = map.get(LINK_REF);
+ String host = (String) map.get(HOST);
+ Object o = map.get(PORT);
+ int port = Integer.parseInt(String.valueOf(o));
+ String user = (String) map.get(SASL_USER);
+ String password = (String) map.get(SASL_PASSWORD);
+ String role = (String) map.get(ROLE);
+ String address = (String) map.get(ADDRESS);
+ System.err.println("Host: " + host + "\tPort: " + port + "\t user: " + user +"\t password: " + password);
+ try{
+
+
+ Connection conn2 = new Connection(host, port, user, password, host);
+ Session session2 = conn2.createSession();
+ if(sendingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ sendingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(receivingLinks.containsKey(linkRef))
+ {
+ try
+ {
+ receivingLinks.remove(linkRef).close();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ if(SENDER.equals(role))
+ {
+
+ System.err.println("%%% Creating sender (" + linkRef + ")");
+ Sender sender = session2.createSender(address);
+ sendingLinks.put(linkRef, sender);
+ }
+ else
+ {
+
+ System.err.println("%%% Creating receiver (" + linkRef + ")");
+ Receiver receiver2 = session2.createReceiver(address);
+ receiver2.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+ receivingLinks.put(linkRef, receiver2);
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ else if(SEND_MESSAGE.equals(op))
+ {
+ Sender sender = sendingLinks.get(map.get(LINK_REF));
+ Properties m2props = new Properties();
+ Object messageId = map.get(MESSAGE_ID);
+ m2props.setMessageId(messageId);
+ Map m2propmap = new HashMap();
+ m2propmap.put(OPCODE, TEST);
+ m2propmap.put(VENDOR, vendor);
+ ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
+ Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
+ sender.send(m2);
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, SENT);
+ m3propmap.put(MESSAGE_ID, messageId);
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, vendor);
+
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+messageId)));
+ s.send(m3);
+
+ }
+
+ responseReceiver.acknowledge(m);
+ }
+ else
+ {
+ for(Map.Entry<Object, Receiver> entry : receivingLinks.entrySet())
+ {
+ m = entry.getValue().receive(false);
+ if(m != null)
+ {
+ wait = false;
+
+ System.err.println("%%% Received message from " + entry.getKey());
+
+ Properties mp = m.getProperties();
+ ApplicationProperties ap = m.getApplicationProperties();
+
+ Map m3propmap = new HashMap();
+ m3propmap.put(OPCODE, LOG);
+ m3propmap.put(ACTION, RECEIVED);
+ m3propmap.put(MESSAGE_ID, mp.getMessageId());
+ m3propmap.put(VENDOR, vendor);
+ m3propmap.put(MESSAGE_VENDOR, ap.getValue().get(VENDOR));
+
+ Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
+ new AmqpValue("AMQP-"+mp.getMessageId())));
+ s.send(m3);
+
+ entry.getValue().acknowledge(m);
+ }
+
+ }
+ }
+
+ if(wait)
+ {
+ try
+ {
+ Thread.sleep(500l);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ }
+
+ }
+
+
+
+
+
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return false;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java new file mode 100644 index 0000000000..f61fd64a61 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java @@ -0,0 +1,116 @@ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.commons.cli.Options; + +public class Dump extends Util +{ + private static final String USAGE_STRING = "dump [options] <address>\n\nOptions:"; + + + protected Dump(String[] args) + { + super(args); + } + + public static void main(String[] args) + { + new Dump(args).run(); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasLinkNameOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasSizeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasBlockOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasStdInOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasTxnOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasModeOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected boolean hasCountOption() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void printUsage(Options options) + { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + + Sender s = session.createSender(queue, 10); + + Message message = new Message("dump me"); + message.setDeliveryTag(new Binary("dump".getBytes())); + + s.send(message); + + s.close(); + session.close(); + conn.close(); + + } catch (Connection.ConnectionException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java new file mode 100644 index 0000000000..43ddd6ca25 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java @@ -0,0 +1,327 @@ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.*; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.*; + +public class Filereceiver extends Util +{ + private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:"; + + protected Filereceiver(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + Connection conn = newConnection(); + + Session session = conn.createSession(); + + final File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); + final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); + + Accepted accepted = new Accepted(); + + for(String fileName : unsettledFiles) + { + File theFile = new File(tmpDirectory, fileName); + if(theFile.isFile()) + { + if(fileName.startsWith("~") && fileName.endsWith("~")) + { + theFile.delete(); + } + else + { + int splitPoint = fileName.indexOf("."); + String deliveryTagStr = fileName.substring(0,splitPoint); + String actualFileName = fileName.substring(splitPoint+1); + + byte[] bytes = new byte[deliveryTagStr.length()/2]; + + + for(int i = 0; i < bytes.length; i++) + { + char c = deliveryTagStr.charAt(2*i); + char d = deliveryTagStr.charAt(1+(2*i)); + + bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) + | (d <= '9' ? d - '0' : d - 'W')); + + } + Binary deliveryTag = new Binary(bytes); + unsettled.put(deliveryTag, accepted); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + } + + Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled(); + + for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + + File tmpFile = new File(tmpDirectory, entry.getValue()); + final File dest = new File(directory, + entry.getValue().substring(entry.getValue().indexOf(".") + 1)); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + + tmpFile.renameTo(dest); + } + } + + + int credit = 10; + + r.setCredit(UnsignedInteger.valueOf(credit), true); + + + int received = 0; + Message m = null; + do + { + m = isBlock() && received == 0 ? r.receive() : r.receive(10000); + if(m != null) + { + if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) + { + final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); + final File unsettledFile = new File(tmpDirectory, + tmpFileName); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + int splitPoint = tmpFileName.indexOf("."); + + String fileName = tmpFileName.substring(splitPoint+1); + + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + unsettledFileNames.remove(deliveryTag); + } + }); + } + else + { + received++; + List<Section> sections = m.getPayload(); + Binary deliveryTag = m.getDeliveryTag(); + StringBuilder tagNameBuilder = new StringBuilder(); + + ByteBuffer dtbuf = deliveryTag.asByteBuffer(); + while(dtbuf.hasRemaining()) + { + tagNameBuilder.append(String.format("%02x", dtbuf.get())); + } + + + ApplicationProperties properties = null; + List<Binary> data = new ArrayList<Binary>(); + int totalSize = 0; + for(Section section : sections) + { + if(section instanceof ApplicationProperties) + { + properties = (ApplicationProperties) section; + } + else if(section instanceof AmqpValue) + { + AmqpValue value = (AmqpValue) section; + if(value.getValue() instanceof Binary) + { + Binary binary = (Binary) value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + else + { + // TODO exception + } + } + else if(section instanceof Data) + { + Data value = (Data) section; + Binary binary = value.getValue(); + data.add(binary); + totalSize += binary.getLength(); + + } + } + if(properties != null) + { + final String fileName = (String) properties.getValue().get("filename"); + byte[] fileData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(fileData); + int offset = 0; + for(Binary bin : data) + { + buf.put(bin.asByteBuffer()); + } + File outputFile = new File(tmpDirectory, "~"+fileName+"~"); + if(outputFile.exists()) + { + outputFile.delete(); + } + FileOutputStream fos = new FileOutputStream(outputFile); + fos.write(fileData); + fos.flush(); + fos.close(); + + final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + + fileName); + outputFile.renameTo(unsettledFile); + r.acknowledge(m, new Receiver.SettledAction() + { + public void onSettled(final Binary deliveryTag) + { + final File dest = new File(directory, fileName); + if(dest.exists()) + { + System.err.println("Duplicate detected - filename " + dest.getName()); + } + unsettledFile.renameTo(dest); + + } + }); + + } + } + } + } + while(m != null); + + + r.close(); + } + else + { + System.err.println("No such directory: " + directoryName); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (FileNotFoundException e) + { + e.printStackTrace(); //TODO. + } + catch (IOException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + public static void main(String[] args) + { + new Filereceiver(args).run(); + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java new file mode 100644 index 0000000000..83b305ac03 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java @@ -0,0 +1,276 @@ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; + +public class Filesender extends Util +{ + private static final String USAGE_STRING = "filesender [options] <address> <directory>\n\nOptions:"; + + protected Filesender(String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return false; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return false; + } + + @Override + protected boolean hasModeOption() + { + return false; + } + + @Override + protected boolean hasCountOption() + { + return false; + } + + @Override + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + + } + + @Override + protected void run() + { + final String queue = getArgs()[0]; + final String directoryName = getArgs()[1]; + + try + { + MessageDigest md5 = MessageDigest.getInstance("MD5"); + Connection conn = newConnection(); + + Session session = conn.createSession(); + + File directory = new File(directoryName); + if(directory.isDirectory() && directory.canWrite()) + { + + File tmpDirectory = new File(directoryName, ".tmp"); + if(!tmpDirectory.exists()) + { + tmpDirectory.mkdir(); + } + + String[] unsettledFiles = tmpDirectory.list(); + + + + Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); + Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); + for(String fileName : unsettledFiles) + { + File aFile = new File(tmpDirectory, fileName); + if(aFile.canRead() && aFile.canWrite()) + { + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + unsettled.put(deliveryTag, null); + unsettledFileNames.put(deliveryTag, fileName); + } + } + + + Sender s = session.createSender(queue, 10, AcknowledgeMode.EO, getLinkName(), isDurableLink(), + unsettled); + + Map<Binary, DeliveryState> remoteUnsettled = s.getRemoteUnsettled(); + + for(Map.Entry<Binary, String> entry: unsettledFileNames.entrySet()) + { + if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) + { + (new File(tmpDirectory, entry.getValue())).renameTo(new File(directory, entry.getValue())); + } + } + + if(remoteUnsettled != null) + { + for(Map.Entry<Binary, DeliveryState> entry : remoteUnsettled.entrySet()) + { + if(entry.getValue() instanceof Accepted) + { + final String fileName = unsettledFileNames.get(entry.getKey()); + if(fileName != null) + { + + Message resumed = new Message(); + resumed.setDeliveryTag(entry.getKey()); + resumed.setDeliveryState(entry.getValue()); + resumed.setResume(Boolean.TRUE); + resumed.setSettled(Boolean.TRUE); + + + + final File unsettledFile = new File(tmpDirectory, fileName); + unsettledFile.delete(); + + s.send(resumed); + + } + + } + else if(entry.getValue() instanceof Received || entry.getValue() == null) + { + final File unsettledFile = new File(tmpDirectory, unsettledFileNames.get(entry.getKey())); + Message resumed = createMessageFromFile(md5, unsettledFileNames.get(entry.getKey()), unsettledFile); + resumed.setResume(Boolean.TRUE); + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + s.send(resumed, action); + + } + } + } + + + + String[] files = directory.list(); + + for(String fileName : files) + { + final File file = new File(directory, fileName); + + if(file.canRead() && file.canWrite() && !file.isDirectory()) + { + Message message = createMessageFromFile(md5, fileName, file); + + final File unsettledFile = new File(tmpDirectory, fileName); + + Sender.OutcomeAction action = new Sender.OutcomeAction() + { + public void onOutcome(Binary deliveryTag, Outcome outcome) + { + if(outcome instanceof Accepted) + { + unsettledFile.delete(); + } + } + }; + + file.renameTo(unsettledFile); + + s.send(message, action); + } + } + + s.close(); + } + else + { + System.err.println("No such directory: " + directory); + } + session.close(); + conn.close(); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); + } catch (FileNotFoundException e) + { + e.printStackTrace(); + } catch (IOException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (NoSuchAlgorithmException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + } + + private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException + { + FileInputStream fis = new FileInputStream(file); + byte[] data = new byte[(int) file.length()]; + + int read = fis.read(data); + + fis.close(); + + Section applicationProperties = new ApplicationProperties(Collections.singletonMap("filename", fileName)); + Section amqpValue = new Data(new Binary(data)); + Message message = new Message(Arrays.asList(applicationProperties, amqpValue)); + Binary deliveryTag = new Binary(md5.digest(fileName.getBytes())); + message.setDeliveryTag(deliveryTag); + md5.reset(); + return message; + } + + public static void main(String[] args) + { + new Filesender(args).run(); + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java new file mode 100644 index 0000000000..7c1172898b --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Message.java @@ -0,0 +1,148 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class Message
+{
+ private Binary _deliveryTag;
+ private List<Section> _payload = new ArrayList<Section>();
+ private Boolean _resume;
+ private boolean _settled;
+ private DeliveryState _deliveryState;
+ private Receiver _receiver;
+
+
+ public Message()
+ {
+ }
+
+ public Message(Collection<Section> sections)
+ {
+ _payload.addAll(sections);
+ }
+
+ public Message(Section section)
+ {
+ this(Collections.singletonList(section));
+ }
+
+ public Message(String message)
+ {
+ this(new AmqpValue(message));
+ }
+
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public void setDeliveryTag(Binary deliveryTag)
+ {
+ _deliveryTag = deliveryTag;
+ }
+
+ public List<Section> getPayload()
+ {
+ return Collections.unmodifiableList(_payload);
+ }
+
+ private <T extends Section> T getSection(Class<T> clazz)
+ {
+ for(Section s : _payload)
+ {
+ if(clazz.isAssignableFrom(s.getClass()))
+ {
+ return (T) s;
+ }
+ }
+ return null;
+ }
+
+ public ApplicationProperties getApplicationProperties()
+ {
+ return getSection(ApplicationProperties.class);
+ }
+
+ public Properties getProperties()
+ {
+ return getSection(Properties.class);
+ }
+
+ public Header getHeader()
+ {
+ return getSection(Header.class);
+ }
+
+
+ public void setResume(final Boolean resume)
+ {
+ _resume = resume;
+ }
+
+ public boolean isResume()
+ {
+ return Boolean.TRUE.equals(_resume);
+ }
+
+ public void setDeliveryState(DeliveryState state)
+ {
+ _deliveryState = state;
+ }
+
+ public DeliveryState getDeliveryState()
+ {
+ return _deliveryState;
+ }
+
+ public void setSettled(boolean settled)
+ {
+ _settled = settled;
+ }
+
+ public boolean getSettled()
+ {
+ return _settled;
+ }
+
+ public void setReceiver(final Receiver receiver)
+ {
+ _receiver = receiver;
+ }
+
+ public Receiver getReceiver()
+ {
+ return _receiver;
+ }
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java new file mode 100644 index 0000000000..07ae54b54f --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ReadBytes.java @@ -0,0 +1,77 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.codec.ValueHandler;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ReadBytes
+{
+
+ public static void main(String[] args) throws IOException, AmqpErrorException
+ {
+
+ if(args.length == 0)
+ {
+ readBytes(System.in);
+ }
+ else
+ {
+ for(String fileName : args)
+ {
+ System.out.println("=========================== " + fileName + " ===========================");
+ final FileInputStream fis = new FileInputStream(fileName);
+ readBytes(fis);
+ fis.close();
+ }
+ }
+
+ }
+
+ private static void readBytes(final InputStream inputStream) throws IOException, AmqpErrorException
+ {
+ byte[] bytes = new byte[4096];
+
+ ValueHandler valueHandler = new ValueHandler(AMQPDescribedTypeRegistry.newInstance());
+
+ int count;
+
+ while((count = inputStream.read(bytes))!=-1)
+ {
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ buf.limit(count);
+ while(buf.hasRemaining())
+ {
+
+ final Object value = valueHandler.parse(buf);
+ System.out.print((value == null ? "" : value.getClass().getName() + ":") +value +"\n");
+
+ }
+ }
+
+ }
+
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java new file mode 100644 index 0000000000..0da9dc3fb7 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java @@ -0,0 +1,246 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.commons.cli.*; +import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; +import org.apache.qpid.amqp_1_0.type.messaging.Filter; +import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; + +import java.util.Collections; + +public class Receive extends Util +{ + private static final String USAGE_STRING = "receive [options] <address> \n\nOptions:"; + private static final UnsignedLong UNSIGNED_LONG_ONE = UnsignedLong.valueOf(1L); + private UnsignedLong _lastCorrelationId; + + public static void main(String[] args) + { + new Receive(args).run(); + } + + + public Receive(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return true; + } + + @Override + protected boolean hasLinkNameOption() + { + return true; + } + + @Override + protected boolean hasResponseQueueOption() + { + return false; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return true; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + @Override + protected boolean hasFilterOption() + { + return true; + } + + protected void run() + { + + try + { + final String queue = getArgs()[0]; + + String message = ""; + + Connection conn = newConnection(); + + + Session session = conn.createSession(); + + Filter filter = null; + if(getFilter() != null) + { + String[] filterParts = getFilter().split("=",2); + if("exact-subject".equals(filterParts[0])) + { + filter = new ExactSubjectFilter(filterParts[1]); + } + else if("matching-subject".equals(filterParts[0])) + { + filter = new MatchingSubjectFilter(filterParts[1]); + } + else + { + System.err.println("Unknown filter type: " + filterParts[0]); + } + } + + Receiver r = + filter == null + ? session.createReceiver(queue, getMode(), getLinkName(), isDurableLink()) + : session.createReceiver(queue, getMode(), getLinkName(), isDurableLink(), Collections.singletonMap(Symbol.valueOf("filter"), filter), null); + Transaction txn = null; + + int credit = 0; + int receivedCount = 0; + + if(!useStdIn()) + { + if(getArgs().length <= 2) + { + + Transaction txn2 = null; + if(useTran()) + { + txn = session.createSessionLocalTransaction(); + txn2 = session.createSessionLocalTransaction(); + } + + for(int i = 0; i < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + { + r.setCredit(UnsignedInteger.valueOf(credit), false); + } + if(!isBlock()) + r.drain(); + } + + Message m = isBlock() ? r.receive() : r.receive(1000L); + credit--; + if(m==null) + { + break; + } + + + + r.acknowledge(m.getDeliveryTag(),txn); + + receivedCount++; + + System.out.println("Received Message : " + m.getPayload()); + } + + if(useTran()) + { + txn.commit(); + } + } + else + { + // TODO + } + } + else + { + // TODO + } + r.close(); + session.close(); + conn.close(); + System.out.println("Total Messages Received: " + receivedCount); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java new file mode 100644 index 0000000000..c5b3cd35ab --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -0,0 +1,548 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Receiver implements DeliveryStateHandler
+{
+ private ReceivingLinkEndpoint _endpoint;
+ private int _id;
+ private static final UnsignedInteger DEFAULT_INITIAL_CREDIT = UnsignedInteger.valueOf(100);
+ private Session _session;
+
+ private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
+ private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
+ private MessageArrivalListener _messageArrivalListener;
+ private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode) throws AmqpErrorException
+ {
+ this(session, linkName, target, source, ackMode, false);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ boolean isDurable) throws AmqpErrorException
+ {
+ this(session,linkName,target,source,ackMode,isDurable,null);
+ }
+
+ public Receiver(final Session session,
+ final String linkName,
+ final Target target,
+ final Source source,
+ final AcknowledgeMode ackMode,
+ final boolean isDurable,
+ final Map<Binary,Outcome> unsettled) throws AmqpErrorException
+ {
+
+ _session = session;
+ if(isDurable)
+ {
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ _endpoint = session.getEndpoint().createReceivingLinkEndpoint(linkName, target, source,
+ UnsignedInteger.ZERO);
+
+ _endpoint.setDeliveryStateHandler(this);
+
+ switch(ackMode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+
+ _endpoint.setLinkEventListener(new ReceivingLinkListener.DefaultLinkEventListener()
+ {
+ @Override public void messageTransfer(final Transfer xfr)
+ {
+ _prefetchQueue.add(xfr);
+ postPrefetchAction();
+ }
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ super.remoteDetached(endpoint, detach);
+ }
+ });
+
+ _endpoint.setLocalUnsettled(unsettled);
+ _endpoint.attach();
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isAttached() && !_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ if(_endpoint.getSource() == null)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ throw new AmqpErrorException(getError());
+ }
+ else
+ {
+
+ }
+ }
+
+ private void postPrefetchAction()
+ {
+ if(_messageArrivalListener != null)
+ {
+ _messageArrivalListener.messageArrived(this);
+ }
+ }
+
+ public void setCredit(UnsignedInteger credit, boolean window)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setCreditWindow(window);
+
+ }
+
+
+ public String getAddress()
+ {
+ return ((Source)_endpoint.getSource()).getAddress();
+ }
+
+ public Map getFilter()
+ {
+ return ((Source)_endpoint.getSource()).getFilter();
+ }
+
+ public Message receive()
+ {
+ return receive(-1L);
+ }
+
+ public Message receive(boolean wait)
+ {
+ return receive(wait ? -1L : 0L);
+ }
+
+ // 0 means no wait, -1 wait forever
+ public Message receive(long wait)
+ {
+ Message m = null;
+ Transfer xfr;
+ long endTime = wait > 0L ? System.currentTimeMillis() + wait : 0L;
+
+ while((xfr = receiveFromPrefetch(wait)) != null )
+ {
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ Binary deliveryTag = xfr.getDeliveryTag();
+ Boolean resume = xfr.getResume();
+
+ List<Section> sections = new ArrayList<Section>();
+ List<ByteBuffer> payloads = new ArrayList<ByteBuffer>();
+ int totalSize = 0;
+
+ boolean hasMore;
+ do
+ {
+ hasMore = Boolean.TRUE.equals(xfr.getMore());
+
+ ByteBuffer buf = xfr.getPayload();
+
+ if(buf != null)
+ {
+
+ totalSize += buf.remaining();
+
+ payloads.add(buf);
+ }
+ if(hasMore)
+ {
+ xfr = receiveFromPrefetch(0L);
+ if(xfr== null)
+ {
+ // TODO - this is wrong!!!!
+ System.out.println("eeek");
+ }
+ }
+ }
+ while(hasMore && !Boolean.TRUE.equals(xfr.getAborted()));
+
+ if(!Boolean.TRUE.equals(xfr.getAborted()))
+ {
+ ByteBuffer allPayload = ByteBuffer.allocate(totalSize);
+ for(ByteBuffer payload : payloads)
+ {
+ allPayload.put(payload);
+ }
+ allPayload.flip();
+ SectionDecoder decoder = _session.getSectionDecoder();
+
+ try
+ {
+ sections = decoder.parseAll(allPayload);
+ }
+ catch (AmqpErrorException e)
+ {
+ // todo - throw a sensible error
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ m = new Message(sections);
+ m.setDeliveryTag(deliveryTag);
+ m.setResume(resume);
+ m.setReceiver(this);
+ break;
+ }
+ }
+
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <=0L)
+ {
+ break;
+ }
+ }
+ }
+
+
+ return m;
+
+ }
+
+ private Transfer receiveFromPrefetch(long wait)
+ {
+ long endTime = ((wait >0L) ? (System.currentTimeMillis() + wait) : 0L);
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ Transfer xfr;
+ while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && !_endpoint.isDetached()
+ && wait != 0)
+ {
+ try
+ {
+ if(wait>0L)
+ {
+ lock.wait(wait);
+ }
+ else if(wait<0L)
+ {
+ lock.wait();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ if(wait > 0L)
+ {
+ wait = endTime - System.currentTimeMillis();
+ if(wait <= 0L)
+ {
+ break;
+ }
+ }
+
+ }
+ if(xfr != null)
+ {
+ _prefetchQueue.poll();
+
+ }
+
+ return xfr;
+ }
+
+ }
+
+
+ public void release(final Message m)
+ {
+ release(m.getDeliveryTag());
+ }
+
+ public void release(Binary deliveryTag)
+ {
+ update(new Released(), deliveryTag, null, null);
+ }
+
+
+ public void modified(Binary tag)
+ {
+ update(new Modified(), tag, null, null);
+ }
+
+ public void acknowledge(final Message m)
+ {
+ acknowledge(m.getDeliveryTag());
+ }
+
+ public void acknowledge(final Message m, SettledAction a)
+ {
+ acknowledge(m.getDeliveryTag(), a);
+ }
+
+
+ public void acknowledge(final Message m, Transaction txn)
+ {
+ acknowledge(m.getDeliveryTag(), txn);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag)
+ {
+ acknowledge(deliveryTag, null, null);
+ }
+
+
+ public void acknowledge(final Binary deliveryTag, SettledAction a)
+ {
+ acknowledge(deliveryTag, null, a);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn)
+ {
+ acknowledge(deliveryTag, txn, null);
+ }
+
+ public void acknowledge(final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ update(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void update(Outcome outcome, final Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+
+ DeliveryState state;
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+
+ _endpoint.updateDisposition(deliveryTag,state, settled);
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
+ public void acknowledgeAll(Message m)
+ {
+ acknowledgeAll(m.getDeliveryTag());
+ }
+
+ public void acknowledgeAll(Binary deliveryTag)
+ {
+ acknowledgeAll(deliveryTag, null, null);
+ }
+
+ public void acknowledgeAll(Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ updateAll(new Accepted(), deliveryTag, txn, action);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag)
+ {
+ updateAll(outcome, deliveryTag, null, null);
+ }
+
+ public void updateAll(Outcome outcome, Binary deliveryTag, final Transaction txn, SettledAction action)
+ {
+ DeliveryState state;
+
+ if(txn != null)
+ {
+ TransactionalState txnState = new TransactionalState();
+ txnState.setOutcome(outcome);
+ txnState.setTxnId(txn.getTxnId());
+ state = txnState;
+ }
+ else
+ {
+ state = (DeliveryState) outcome;
+ }
+ boolean settled = txn == null && !ReceiverSettleMode.SECOND.equals(_endpoint.getReceivingSettlementMode());
+
+ if(!(settled || action == null))
+ {
+ _unsettledMap.put(deliveryTag, action);
+ }
+ _endpoint.updateAllDisposition(deliveryTag, state, settled);
+ }
+
+
+
+ public void close()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.close();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+
+ public void detach()
+ {
+ _endpoint.setTarget(null);
+ _endpoint.detach();
+ Message msg;
+ while((msg = receive(-1l)) != null)
+ {
+ release(msg);
+ }
+
+ }
+
+ public void drain()
+ {
+ _endpoint.drain();
+ }
+
+ public void setCreditWithTransaction(final UnsignedInteger credit, final Transaction txn)
+ {
+ _endpoint.setLinkCredit(credit);
+ _endpoint.setTransactionId(txn == null ? null : txn.getTxnId());
+ _endpoint.setCreditWindow(false);
+
+ }
+
+ public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ SettledAction action = _unsettledMap.remove(deliveryTag);
+ if(action != null)
+ {
+ action.onSettled(deliveryTag);
+ }
+ }
+ }
+
+ public Map<Binary, Outcome> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+
+ public void setMessageArrivalListener(final MessageArrivalListener messageArrivalListener)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ _messageArrivalListener = messageArrivalListener;
+ }
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public static interface SettledAction
+ {
+ public void onSettled(Binary deliveryTag);
+ }
+
+
+ public interface MessageArrivalListener
+ {
+ void messageArrived(Receiver receiver);
+ }
+
+}
\ No newline at end of file diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java new file mode 100644 index 0000000000..6e1d15376c --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -0,0 +1,249 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+import java.util.Arrays;
+
+public class Request extends Util
+{
+ private static final String USAGE_STRING = "request [options] <address> [<content> ...]\n\nOptions:";
+
+ public static void main(String[] args)
+ {
+ new Request(args).run();
+ }
+
+ public Request(String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ try
+ {
+
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ Connection conn = newConnection();
+ Session session = conn.createSession();
+
+ Connection conn2;
+ Session session2;
+ Receiver responseReceiver;
+
+ if(isUseMultipleConnections())
+ {
+ conn2 = newConnection();
+ session2 = conn2.createSession();
+ responseReceiver = session2.createTemporaryQueueReceiver();
+ }
+ else
+ {
+ conn2 = null;
+ session2 = null;
+ responseReceiver = session.createTemporaryQueueReceiver();
+ }
+
+
+
+
+ responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
+
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ int received = 0;
+
+ if(getArgs().length >= 2)
+ {
+ message = getArgs()[1];
+ if(message.length() < getMessageSize())
+ {
+ StringBuilder builder = new StringBuilder(getMessageSize());
+ builder.append(message);
+ for(int x = message.length(); x < getMessageSize(); x++)
+ {
+ builder.append('.');
+ }
+ message = builder.toString();
+ }
+
+ for(int i = 0; i < getCount(); i++)
+ {
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ properties.setReplyTo(responseReceiver.getAddress());
+
+ AmqpValue amqpValue = new AmqpValue(message);
+ Section[] sections = { new Header() , properties, amqpValue};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+
+ Message responseMessage = responseReceiver.receive(false);
+ if(responseMessage != null)
+ {
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag(),txn);
+ received++;
+ }
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+
+ while(received < getCount())
+ {
+ Message responseMessage = responseReceiver.receive();
+ responseReceiver.acknowledge(responseMessage.getDeliveryTag());
+ received++;
+ }
+
+
+
+
+ s.close();
+ session.close();
+ conn.close();
+
+ if(session2 != null)
+ {
+ session2.close();
+ conn2.close();
+ }
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+ }
+
+ protected boolean hasSingleLinkPerConnectionMode()
+ {
+ return true;
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java new file mode 100644 index 0000000000..8d9de4893f --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -0,0 +1,347 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.commons.cli.*; + +import java.util.*; + +public class Respond extends Util +{ + private static final String USAGE_STRING = "respond [options] <address>\n\nOptions:"; + private Connection _conn; + private Session _session; + private Receiver _receiver; + private Transaction _txn; + private Map<String,Sender> _senders; + private UnsignedLong _responseMsgId = UnsignedLong.ZERO; + private Connection _conn2; + private Session _session2; + + public Respond(final String[] args) + { + super(args); + } + + @Override + protected boolean hasLinkDurableOption() + { + return false; + } + + @Override + protected boolean hasLinkNameOption() + { + return false; + } + + @Override + protected boolean hasResponseQueueOption() + { + return true; + } + + @Override + protected boolean hasSizeOption() + { + return false; + } + + @Override + protected boolean hasBlockOption() + { + return true; + } + + @Override + protected boolean hasStdInOption() + { + return false; + } + + @Override + protected boolean hasTxnOption() + { + return true; + } + + @Override + protected boolean hasModeOption() + { + return true; + } + + @Override + protected boolean hasCountOption() + { + return true; + } + + @Override + protected boolean hasSingleLinkPerConnectionMode() + { + return true; + } + + + @Override + protected boolean hasWindowSizeOption() + { + return true; + } + + public static void main(String[] args) + { + new Respond(args).run(); + } + + public void run() + { + try + { + + _senders = new HashMap<String, Sender>(); + + final String queue = getArgs()[0]; + + String message = ""; + + _conn = newConnection(); + + + + if(isUseMultipleConnections()) + { + _conn2 = newConnection(); + _session2 = _conn2.createSession(); + } + + + _session = _conn.createSession(); + + + _receiver = _session.createReceiver(queue, getMode()); + _txn = null; + + int credit = 0; + int receivedCount = 0; + _responseMsgId = UnsignedLong.ZERO; + + Random random = null; + int batch = 0; + List<Message> txnMessages = null; + if(useTran()) + { + if(getRollbackRatio() != 0) + { + random = new Random(); + } + batch = getBatchSize(); + _txn = _session.createSessionLocalTransaction(); + txnMessages = new ArrayList<Message>(batch); + } + + + for(int i = 0; receivedCount < getCount(); i++) + { + + if(credit == 0) + { + if(getCount() - i <= getWindowSize()) + { + credit = getCount() - i; + + } + else + { + credit = getWindowSize(); + + } + + _receiver.setCredit(UnsignedInteger.valueOf(credit), false); + + if(!isBlock()) + _receiver.drain(); + } + + Message m = isBlock() ? (receivedCount == 0 ? _receiver.receive() : _receiver.receive(10000L)) : _receiver.receive(1000L); + credit--; + if(m==null) + { + if(useTran() && batch != getBatchSize()) + { + _txn.commit(); + } + break; + } + + System.out.println("Received Message: " + m.getPayload()); + + respond(m); + + + + if(useTran()) + { + + txnMessages.add(m); + + if(--batch == 0) + { + + if(getRollbackRatio() == 0 || random.nextDouble() >= getRollbackRatio()) + { + _txn.commit(); + txnMessages.clear(); + receivedCount += getBatchSize(); + } + else + { + System.out.println("Random Rollback"); + _txn.rollback(); + double result; + do + { + _txn = _session.createSessionLocalTransaction(); + + for(Message msg : txnMessages) + { + respond(msg); + } + + result = random.nextDouble(); + if(result<getRollbackRatio()) + { + _txn.rollback(); + } + else + { + _txn.commit(); + txnMessages.clear(); + receivedCount += getBatchSize(); + } + } + while(result < getRollbackRatio()); + } + _txn = _session.createSessionLocalTransaction(); + + batch = getBatchSize(); + } + } + else + { + receivedCount++; + } + + } + + + for(Sender s : _senders.values()) + { + s.close(); + } + + _receiver.close(); + _session.close(); + _conn.close(); + System.out.println("Received: " + receivedCount); + } + catch (Connection.ConnectionException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderClosingException e) + { + e.printStackTrace(); //TODO. + } + catch (Sender.SenderCreationException e) + { + e.printStackTrace(); //TODO. + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //TODO. + } + } + + private void respond(Message m) throws Sender.SenderCreationException + { + List<Section> sections = m.getPayload(); + String replyTo = null; + Object correlationId = null; + for(Section section : sections) + { + if(section instanceof Properties) + { + replyTo = getResponseQueue() == null ? ((Properties)section).getReplyTo() : getResponseQueue(); + correlationId = ((Properties) section).getMessageId(); + break; + } + } + + if(replyTo != null) + { + Sender s = _senders.get(replyTo); + if(s == null) + { + s = (isUseMultipleConnections() ? _session2 : _session).createSender(replyTo,getWindowSize()); + _senders.put(replyTo, s); + } + + List<Section> replySections = new ArrayList<Section>(sections); + + ListIterator<Section> sectionIterator = replySections.listIterator(); + + while(sectionIterator.hasNext()) + { + Section section = sectionIterator.next(); + if(section instanceof Properties) + { + Properties newProps = new Properties(); + newProps.setTo(replyTo); + newProps.setCorrelationId(correlationId); + newProps.setMessageId(_responseMsgId); + _responseMsgId = _responseMsgId.add(UnsignedLong.ONE); + sectionIterator.set(newProps); + } + } + + Message replyMessage = new Message(replySections); + System.out.println("Sent Message: " + replySections); + s.send(replyMessage, _txn); + + } + _receiver.acknowledge(m.getDeliveryTag(), _txn); + } + + protected void printUsage(Options options) + { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(USAGE_STRING, options ); + } + +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java new file mode 100644 index 0000000000..6f6575e083 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java @@ -0,0 +1,244 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.util.Arrays;
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.commons.cli.*;
+
+public class Send extends Util
+{
+ private static final String USAGE_STRING = "send [options] <address> [<content> ...]\n\nOptions:";
+ private static final char[] HEX = {'0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F'};
+
+
+ public static void main(String[] args) throws Sender.SenderCreationException, Sender.SenderClosingException, Connection.ConnectionException
+ {
+ new Send(args).run();
+ }
+
+
+ public Send(final String[] args)
+ {
+ super(args);
+ }
+
+ @Override
+ protected boolean hasLinkDurableOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasLinkNameOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasResponseQueueOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasBlockOption()
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean hasStdInOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasTxnOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasModeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasCountOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasWindowSizeOption()
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean hasSubjectOption()
+ {
+ return true;
+ }
+
+ public void run()
+ {
+
+ final String queue = getArgs()[0];
+
+ String message = "";
+
+ try
+ {
+ Connection conn = newConnection();
+
+ Session session = conn.createSession();
+
+
+ Sender s = session.createSender(queue, getWindowSize(), getMode(), getLinkName());
+
+ Transaction txn = null;
+
+ if(useTran())
+ {
+ txn = session.createSessionLocalTransaction();
+ }
+
+ if(!useStdIn())
+ {
+ if(getArgs().length <= 2)
+ {
+ if(getArgs().length == 2)
+ {
+ message = getArgs()[1];
+ }
+ for(int i = 0; i < getCount(); i++)
+ {
+
+ Properties properties = new Properties();
+ properties.setMessageId(UnsignedLong.valueOf(i));
+ if(getSubject() != null)
+ {
+ properties.setSubject(getSubject());
+ }
+ Section bodySection;
+ byte[] bytes = (message + " " + i).getBytes();
+ if(bytes.length < getMessageSize())
+ {
+ byte[] origBytes = bytes;
+ bytes = new byte[getMessageSize()];
+ System.arraycopy(origBytes,0,bytes,0,origBytes.length);
+ for(int x = origBytes.length; x < bytes.length; x++)
+ {
+ bytes[x] = (byte) '.';
+ }
+ bodySection = new Data(new Binary(bytes));
+ }
+ else
+ {
+ bodySection = new AmqpValue(message + " " + i);
+ }
+
+ Section[] sections = {properties, bodySection};
+ final Message message1 = new Message(Arrays.asList(sections));
+
+ s.send(message1, txn);
+ }
+ }
+ else
+ {
+ for(int i = 1; i < getArgs().length; i++)
+ {
+ s.send(new Message(getArgs()[i]), txn);
+ }
+
+ }
+ }
+ else
+ {
+ LineNumberReader buf = new LineNumberReader(new InputStreamReader(System.in));
+
+
+ try
+ {
+ while((message = buf.readLine()) != null)
+ {
+ s.send(new Message(message), txn);
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+
+ if(txn != null)
+ {
+ txn.commit();
+ }
+
+ s.close();
+
+ session.close();
+ conn.close();
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Connection.ConnectionException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
+
+
+ }
+
+ protected void printUsage(Options options)
+ {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(USAGE_STRING, options );
+ }
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java new file mode 100644 index 0000000000..6f97ecd810 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SendBytes.java @@ -0,0 +1,331 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.codec.FrameWriter;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.framing.AMQFrame;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.FrameBody;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.UnsignedLong;
+import org.apache.qpid.amqp_1_0.type.UnsignedShort;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.Footer;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class SendBytes
+{
+
+ public static void main(String[] args) throws
+ Sender.SenderCreationException,
+ Sender.SenderClosingException,
+ Connection.ConnectionException,
+ IOException, ParseException
+ {
+ Transfer xfr = new Transfer();
+ Flow fs = new Flow();
+ fs.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ fs.setDeliveryCount(UnsignedInteger.valueOf(2));
+ fs.setLinkCredit(UnsignedInteger.valueOf(18));
+ fs.setAvailable(UnsignedInteger.valueOf(0));
+ fs.setDrain(false);
+
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
+ //xfr.setDeliveryTag(new Binary(new byte[] {0}));
+ xfr.setDeliveryId(UnsignedInteger.valueOf(0));
+ xfr.setSettled(true);
+
+
+ Header h = new Header();
+ Properties p = new Properties();
+ p.setTo("queue");
+ //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
+
+ Footer f = new Footer(Collections.EMPTY_MAP);
+
+ Section[] sections = new Section[] { h,p,f};
+ //Section[] sections = new Section[] { b };
+ //Section[] sections = { h,p, b};
+/*
+ Fragment[] fragments = new Fragment[5];
+
+ final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
+
+ SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
+
+ int num = 0;
+ int i = 0;
+ for(Section s : sections)
+ {
+ Fragment frag = new Fragment();
+
+ frag.setPayload(s.encode(encoder));
+ frag.setFirst(true);
+ frag.setLast(true);
+ frag.setSectionCode(s.getSectionCode());
+ frag.setSectionNumber(UnsignedInteger.valueOf(num++));
+ frag.setSectionOffset(UnsignedLong.valueOf(0L));
+ fragments[i++] =frag;
+ }
+
+ xfr.setFragments(fragments);
+*/
+
+ encodeTypes("xfr",xfr);
+
+ final byte[] result;
+ final Object input = xfr;
+/*
+ result = encode(1024, input);
+
+ boolean ok = true;
+
+ for(int j = 10; ok && j < 400; j++)
+ {
+
+ byte[] result2 = encode(j,input);
+
+ for(int i = 0; i <400; i++)
+ {
+ if(result[i] != result2[i])
+ {
+ System.out.println("result differs at " + i + " Splitting at " + j+ " [" + result[i] + " - " + result2[i] + "]");
+ //break;
+ //ok = false;
+
+ }
+ }
+ }*/
+ //System.out.println(Arrays.equals(result, result2));
+
+ //doEncodes();
+ /*OutputStream out = System.out;
+ if(args.length > 0)
+ {
+ out = new FileOutputStream(args[0]);
+ }
+
+ Transfer xfr = new Transfer();
+ fs.setSessionCredit(UnsignedInteger.valueOf(1024));
+ fs.setTransferCount(UnsignedInteger.valueOf(2));
+ fs.setLinkCredit(UnsignedInteger.valueOf(18));
+ fs.setAvailable(UnsignedInteger.valueOf(0));
+ fs.setDrain(false);
+
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ //xfr.setDeliveryTag(new Binary("\"queue\"<-6ec024a7-d98e-4196-9348-15f6026c32ca:0".getBytes()));
+ xfr.setDeliveryTag(new Binary(new byte[] {0}));
+ xfr.setTransferId(UnsignedInteger.valueOf(0));
+ xfr.setSettled(true);
+ xfr.setFlowState(fs);
+
+ Header h = new Header();
+ h.setTransmitTime(new Date(System.currentTimeMillis()));
+ Properties p = new Properties();
+ p.setTo(new Address("queue"));
+ //p.setMessageId(new Binary(UUID.randomUUID().toString().getBytes()));
+ AmqpMapSection m = new AmqpMapSection();
+ DataSection b = new DataSection("Hello World!".getBytes());
+
+ Footer f = new Footer();
+
+ Section[] sections = new Section[] { h,p,m,b,f};
+ //Section[] sections = new Section[] { b };
+ //Section[] sections = { h,p, b};
+ List<Fragment> fragments = new ArrayList<Fragment>(5);
+
+ final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+
+ SectionEncoderImpl encoder = new SectionEncoderImpl(typeRegistry);
+
+ for(Section s : sections)
+ {
+ Fragment frag = new Fragment();
+
+ frag.setPayload(s.encode(encoder));
+ frag.setFirst(true);
+ frag.setLast(true);
+ frag.setFormatCode(s.getSectionCode());
+ frag.setFragmentOffset(null);
+ fragments.add(frag);
+ }
+
+ xfr.setFragments(fragments);
+
+
+ Object[] objectsToWrite = new Object[] { xfr };
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+
+
+ for(Object obj : objectsToWrite)
+ {
+ ValueWriter writer = typeRegistry.getValueWriter(obj);
+
+ int count;
+
+
+ do
+ {
+ count = writer.writeToBuffer(buf);
+ out.write(buf.array(), buf.arrayOffset(), count);
+ buf.clear();
+ } while (!writer.isComplete());
+
+ }
+
+ out.flush();
+ out.close();*/
+
+ }
+
+ public static void doEncodes() throws IOException, ParseException
+ {
+ encodeTypes("boolean", Boolean.TRUE, Boolean.FALSE);
+ encodeTypes("ubyte", UnsignedByte.valueOf((byte)0), UnsignedByte.valueOf((byte)1 ),UnsignedByte.valueOf((byte)3), UnsignedByte.valueOf((byte)42), UnsignedByte.valueOf("255"));
+ encodeTypes("byte", Byte.valueOf((byte)0), Byte.valueOf( (byte)1), Byte.valueOf((byte) 3), Byte.valueOf((byte) 42), Byte.valueOf((byte) 127), Byte.valueOf((byte) -1), Byte.valueOf((byte) -3), Byte.valueOf((byte) -42), Byte.valueOf( (byte)-128));
+ encodeTypes("ushort", UnsignedShort.valueOf((short)0), UnsignedShort.valueOf((short)1), UnsignedShort.valueOf((short)3), UnsignedShort.valueOf((short)42), UnsignedShort.valueOf("65535"));
+ encodeTypes("short", Short.valueOf((short)0), Short.valueOf((short)1), Short.valueOf((short)3), Short.valueOf((short)42), Short.valueOf((short)32767), Short.valueOf((short)-1), Short.valueOf((short)-3), Short.valueOf((short)-42), Short.valueOf((short)-32768));
+ encodeTypes("uint",UnsignedInteger.valueOf(0), UnsignedInteger.valueOf(1), UnsignedInteger.valueOf(3), UnsignedInteger.valueOf(42), UnsignedInteger.valueOf("4294967295"));
+ encodeTypes("int", 0, 1, 3, 42, 2147483647, -1, -3, -42, -2147483648);
+ encodeTypes("ulong", UnsignedLong.valueOf(0), UnsignedLong.valueOf(1), UnsignedLong.valueOf(3), UnsignedLong.valueOf(42), UnsignedLong.valueOf("18446744073709551615"));
+ encodeTypes("long", 0l, 1l, 3l, 42l, 9223372036854775807l, -1l, -3l, -42l, -9223372036854775808l);
+ encodeTypes("float", 3.14159);
+ encodeTypes("double", Double.valueOf(3.14159265359));
+ encodeTypes("char", '?');
+
+ SimpleDateFormat df = new SimpleDateFormat("HHa z MMM d yyyy");
+
+ encodeTypes("timestamp", df.parse("9AM PST Dec 6 2010"), df.parse("9AM PST Dec 6 1910"));
+ encodeTypes("uuid", UUID.fromString("f275ea5e-0c57-4ad7-b11a-b20c563d3b71"));
+ encodeTypes("binary", new Binary( new byte[] {(byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF}), new Binary(new byte[] { (byte)0xCA,(byte)0xFE, (byte)0xBA, (byte)0xBE}));
+ encodeTypes("string", "The quick brown fox jumped over the lazy cow.");
+ encodeTypes("symbol", Symbol.valueOf("connectathon"));
+ encodeTypes("list", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
+ Map map = new HashMap();
+ map.put("one", Long.valueOf(1));
+ map.put("two", Long.valueOf(2));
+ map.put("pi", Double.valueOf(3.14159265359));
+ map.put("list:", Arrays.asList(new Object[] {Long.valueOf(1), "two", Double.valueOf(3.14159265359), null, Boolean.FALSE}));
+ map.put(null, Boolean.TRUE);
+ encodeTypes("map", map);
+ encodeTypes("null", null);
+
+ }
+
+ static void encodeTypes(String name, Object... vals ) throws IOException
+ {
+ FileOutputStream out = new FileOutputStream("/home/rob/"+name+".out");
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+ final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+
+ if(vals != null)
+ {
+ for(Object obj : vals)
+ {
+ ValueWriter writer = typeRegistry.getValueWriter(obj);
+
+ int count;
+
+
+ do
+ {
+ count = writer.writeToBuffer(buf);
+ out.write(buf.array(), buf.arrayOffset(), count);
+ buf.clear();
+ } while (!writer.isComplete());
+
+ }
+ }
+ else
+ {
+ ValueWriter writer = typeRegistry.getValueWriter(null);
+
+ int count;
+
+
+ do
+ {
+ count = writer.writeToBuffer(buf);
+ out.write(buf.array(), buf.arrayOffset(), count);
+ buf.clear();
+ } while (!writer.isComplete());
+
+ }
+ out.flush();
+ out.close();
+
+ }
+
+ static byte[] encode(int size, Object... vals)
+ {
+ byte[] result = new byte[10000];
+ int pos = 0;
+
+ final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+ AMQFrame frame = AMQFrame.createAMQFrame((short) 0, (FrameBody) vals[0]);
+ FrameWriter writer = new FrameWriter(typeRegistry);
+ /*for(Object obj : vals)
+ {
+ final AMQPDescribedTypeRegistry typeRegistry = AMQPDescribedTypeRegistry.newInstance();
+ ValueWriter writer = typeRegistry.getValueWriter(obj);
+*/
+ int count;
+
+ ByteBuffer buf = ByteBuffer.wrap(result, pos, size);
+
+ do
+ {
+
+ writer.writeToBuffer(buf);
+ pos = buf.position();
+ buf = ByteBuffer.wrap(result, pos, size);
+ if(!writer.isComplete())
+ {
+ count = 3;
+ }
+
+ } while (!writer.isComplete());
+/*
+
+ }
+*/
+
+ return result;
+
+ }
+
+
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java new file mode 100644 index 0000000000..c20eec6c8e --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -0,0 +1,392 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.Source;
+import org.apache.qpid.amqp_1_0.type.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Sender implements DeliveryStateHandler
+{
+ private SendingLinkEndpoint _endpoint;
+ private int _id;
+ private Session _session;
+ private int _windowSize;
+ private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
+ private boolean _closed;
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
+ throws SenderCreationException
+ {
+ this(session, linkName, targetAddr, sourceAddr, false);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ boolean synchronous)
+ throws SenderCreationException
+ {
+ this(session, linkName, targetAddr, sourceAddr, synchronous ? 1 : 0);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window) throws SenderCreationException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, AcknowledgeMode.ALO);
+ }
+
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window) throws SenderCreationException
+ {
+ this(session, linkName, target, source, window, AcknowledgeMode.ALO);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode)
+ throws SenderCreationException
+ {
+ this(session, linkName, target, source, window, mode, null);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException
+ {
+ this(session, linkName, targetAddr, sourceAddr, window, mode, false, unsettled);
+ }
+
+ public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr,
+ int window, AcknowledgeMode mode, boolean isDurable, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException
+ {
+ this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
+ source.setAddress(sourceAddr);
+ return source;
+ }
+
+ private static org.apache.qpid.amqp_1_0.type.messaging.Target createTarget(final String targetAddr, final boolean isDurable)
+ {
+ org.apache.qpid.amqp_1_0.type.messaging.Target target = new org.apache.qpid.amqp_1_0.type.messaging.Target();
+ target.setAddress(targetAddr);
+ if(isDurable)
+ {
+ target.setDurable(TerminusDurability.UNSETTLED_STATE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ }
+ return target;
+ }
+
+ public Sender(final Session session, final String linkName, final org.apache.qpid.amqp_1_0.type.messaging.Target target, final org.apache.qpid.amqp_1_0.type.messaging.Source source,
+ int window, AcknowledgeMode mode, Map<Binary, Outcome> unsettled)
+ throws SenderCreationException
+ {
+
+ _session = session;
+ _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
+ source, target, unsettled);
+
+
+ switch(mode)
+ {
+ case ALO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ break;
+ case AMO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.SETTLED);
+ break;
+ case EO:
+ _endpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ _endpoint.setReceivingSettlementMode(ReceiverSettleMode.SECOND);
+ break;
+
+ }
+ _endpoint.setDeliveryStateHandler(this);
+ _endpoint.attach();
+ _windowSize = window;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderCreationException(e);
+ }
+ }
+ if(_endpoint.getTarget()== null)
+ {
+ throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
+ };
+ }
+ }
+
+ public Source getSource()
+ {
+ return _endpoint.getSource();
+ }
+
+ public Target getTarget()
+ {
+ return _endpoint.getTarget();
+ }
+
+ public void send(Message message)
+ {
+ send(message, null, null);
+ }
+
+ public void send(Message message, final OutcomeAction action)
+ {
+ send(message, null, action);
+ }
+
+ public void send(Message message, final Transaction txn)
+ {
+ send(message, txn, null);
+ }
+
+ public void send(Message message, final Transaction txn, OutcomeAction action)
+ {
+
+ List<Section> sections = message.getPayload();
+
+ Transfer xfr = new Transfer();
+
+ if(sections != null && !sections.isEmpty())
+ {
+ SectionEncoder encoder = _session.getSectionEncoder();
+ encoder.reset();
+
+ int sectionNumber = 0;
+ for(Section section : sections)
+ {
+ encoder.encodeObject(section);
+ }
+
+
+ Binary encoding = encoder.getEncoding();
+ ByteBuffer payload = encoding.asByteBuffer();
+ xfr.setPayload(payload);
+ }
+ if(message.getDeliveryTag() == null)
+ {
+ message.setDeliveryTag(new Binary(String.valueOf(_id++).getBytes()));
+ }
+ if(message.isResume())
+ {
+ xfr.setResume(Boolean.TRUE);
+ }
+ if(message.getDeliveryState() != null)
+ {
+ xfr.setState(message.getDeliveryState());
+ }
+
+ xfr.setDeliveryTag(message.getDeliveryTag());
+ //xfr.setSettled(_windowSize ==0);
+ if(txn != null)
+ {
+ xfr.setSettled(false);
+ TransactionalState deliveryState = new TransactionalState();
+ deliveryState.setTxnId(txn.getTxnId());
+ xfr.setState(deliveryState);
+ }
+ else
+ {
+ xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
+ }
+ final Object lock = _endpoint.getLock();
+ synchronized(lock)
+ {
+ while(!_endpoint.hasCreditToSend())
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ if(action != null)
+ {
+ _outcomeActions.put(message.getDeliveryTag(), action);
+ }
+ _endpoint.transfer(xfr);
+ //TODO - rationalise sending of flows
+ // _endpoint.sendFlow();
+ }
+
+ if(_windowSize != 0)
+ {
+ synchronized(lock)
+ {
+
+
+ while(_endpoint.getUnsettledCount() >= _windowSize)
+ {
+ try
+ {
+ lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+
+
+ }
+
+ public void close() throws SenderClosingException
+ {
+
+ if(_windowSize != 0)
+ {
+ synchronized(_endpoint.getLock())
+ {
+
+
+ while(_endpoint.getUnsettledCount() > 0)
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+
+ }
+ _session.removeSender(this);
+ _endpoint.setSource(null);
+ _endpoint.detach();
+ _closed = true;
+
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new SenderClosingException(e);
+ }
+ }
+ }
+ }
+
+ public boolean isClosed()
+ {
+ return _closed;
+ }
+
+ public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(state instanceof Outcome)
+ {
+ OutcomeAction action;
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, (Outcome) state);
+ }
+ if(!Boolean.TRUE.equals(settled))
+ {
+ _endpoint.updateDisposition(deliveryTag, state, true);
+ }
+ }
+ }
+
+ public Map<Binary, DeliveryState> getRemoteUnsettled()
+ {
+ return _endpoint.getInitialUnsettledMap();
+ }
+
+ public Session getSession()
+ {
+ return _session;
+ }
+
+ public class SenderCreationException extends Exception
+ {
+ public SenderCreationException(Throwable e)
+ {
+ super(e);
+ }
+
+ public SenderCreationException(String e)
+ {
+ super(e);
+
+ }
+ }
+
+ public class SenderClosingException extends Exception
+ {
+ public SenderClosingException(Throwable e)
+ {
+ super(e);
+ }
+ }
+
+ public static interface OutcomeAction
+ {
+ public void onOutcome(Binary deliveryTag, Outcome outcome);
+ }
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java new file mode 100644 index 0000000000..5e1e1b1d7c --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -0,0 +1,354 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionState;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class Session
+{
+ private SessionEndpoint _endpoint;
+ private List<Receiver> _receivers = new ArrayList<Receiver>();
+ private List<Sender> _senders = new ArrayList<Sender>();
+ private SectionEncoder _sectionEncoder;
+ private SectionDecoder _sectionDecoder;
+ private TransactionController _sessionLocalTC;
+ private Connection _connection;
+
+ public Session(final Connection connection, String name)
+ {
+ _connection = connection;
+ _endpoint = connection.getEndpoint().createSession(name);
+ _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
+ }
+
+
+ public synchronized Sender createSender(final String targetName) throws Sender.SenderCreationException
+ {
+ return createSender(targetName, false);
+ }
+
+ public synchronized Sender createSender(final String targetName, boolean synchronous) throws Sender.SenderCreationException
+ {
+
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
+
+ }
+
+ public synchronized Sender createSender(final String targetName, int window) throws Sender.SenderCreationException
+ {
+ final String sourceName = UUID.randomUUID().toString();
+ return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, window);
+
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode) throws Sender.SenderCreationException
+ {
+
+ return createSender(targetName, window, mode, null);
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) throws Sender.SenderCreationException
+ {
+ return createSender(targetName, window, mode, linkName, null);
+ }
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled) throws Sender.SenderCreationException
+ {
+ return createSender(targetName, window, mode, linkName, false, unsettled);
+ }
+
+ public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled) throws Sender.SenderCreationException
+ {
+ return new Sender(this, linkName == null ? "->" + targetName + '(' + UUID.randomUUID().toString()+')': linkName,
+ targetName, null, window, mode, isDurable, unsettled);
+
+ }
+
+
+ public Receiver createReceiver(final String sourceAddr) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode) throws AmqpErrorException
+ {
+ return createReceiver(queue, null, mode);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+ throws AmqpErrorException
+ {
+ return createReceiver(queue, null, mode, linkName);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+ throws AmqpErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable);
+ }
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws AmqpErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
+ }
+
+
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
+ boolean isDurable, Map<Binary, Outcome> unsettled) throws AmqpErrorException
+ {
+ return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+ throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+ throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
+ }
+
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr,mode, ackMode, linkName, false);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+ throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
+ }
+
+ private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Binary, Outcome> unsettled) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
+ }
+
+ public synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
+ final AcknowledgeMode ackMode, String linkName, boolean isDurable,
+ Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws AmqpErrorException
+ {
+
+ final Target target = new Target();
+ final Source source = new Source();
+ source.setAddress(sourceAddr);
+ source.setDistributionMode(mode);
+ source.setFilter(filters);
+
+ if(linkName == null)
+ {
+ linkName = sourceAddr + "-> (" + UUID.randomUUID().toString() + ")";
+ }
+
+ final Receiver receiver =
+ new Receiver(this, linkName,
+ target, source, ackMode, isDurable, unsettled);
+ _receivers.add(receiver);
+
+ return receiver;
+
+ }
+
+ public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.COPY);
+ }
+
+ public synchronized Receiver createMovingReceiver(final String sourceAddr) throws AmqpErrorException
+ {
+ return createReceiver(sourceAddr, StdDistMode.MOVE);
+ }
+
+ public Receiver createTemporaryQueueReceiver() throws AmqpErrorException
+ {
+ Source source = new Source();
+ source.setDynamic(true);
+
+ final Receiver receiver = new Receiver(this, "tempSender"+UUID.randomUUID().toString(), new Target(),
+ source, AcknowledgeMode.ALO);
+ _receivers.add(receiver);
+ return receiver;
+ }
+
+ public Sender createTemporaryQueueSender() throws Sender.SenderCreationException
+ {
+ Target target = new Target();
+ target.setDynamic(true);
+
+ final Sender sender;
+ sender = new Sender(this, "tempSender"+ UUID.randomUUID().toString(), target,
+ new Source(), 0, AcknowledgeMode.ALO);
+ _senders.add(sender);
+ return sender;
+ }
+
+
+
+ public SessionEndpoint getEndpoint()
+ {
+ return _endpoint;
+ }
+
+ public synchronized void close()
+ {
+ try
+ {
+ for(Sender sender : new ArrayList<Sender>(_senders))
+ {
+ sender.close();
+ }
+ for(Receiver receiver : new ArrayList<Receiver>(_receivers))
+ {
+ receiver.detach();
+ }
+ if(_sessionLocalTC != null)
+ {
+ _sessionLocalTC.close();
+ }
+ _endpoint.end();
+ }
+ catch (Sender.SenderClosingException e)
+ {
+// TODO
+ e.printStackTrace();
+ }
+
+ //TODO
+
+ }
+
+ void removeSender(Sender sender)
+ {
+ _senders.remove(sender);
+ }
+
+ void removeReceiver(Receiver receiver)
+ {
+ _receivers.remove(receiver);
+ }
+
+ public SectionEncoder getSectionEncoder()
+ {
+ return _sectionEncoder;
+ }
+
+ public SectionDecoder getSectionDecoder()
+ {
+ return _sectionDecoder;
+ }
+
+
+ public Transaction createSessionLocalTransaction()
+ {
+ TransactionController localController = getSessionLocalTransactionController();
+ return localController.beginTransaction();
+ }
+
+ private TransactionController getSessionLocalTransactionController()
+ {
+ if(_sessionLocalTC == null)
+ {
+ _sessionLocalTC = createSessionLocalTransactionController();
+ }
+ return _sessionLocalTC;
+ }
+
+ private TransactionController createSessionLocalTransactionController()
+ {
+ String name = "txnControllerLink";
+ SendingLinkEndpoint tcLinkEndpoint = _endpoint.createTransactionController(name, TxnCapability.LOCAL_TXN,
+ TxnCapability.MULTI_TXNS_PER_SSN);
+ tcLinkEndpoint.setReceivingSettlementMode(ReceiverSettleMode.FIRST);
+ tcLinkEndpoint.setSendingSettlementMode(SenderSettleMode.UNSETTLED);
+ tcLinkEndpoint.attach();
+ return new TransactionController(this, tcLinkEndpoint);
+ }
+
+
+ public Message receive()
+ {
+ while(getEndpoint().getState() == SessionState.ACTIVE)
+ {
+ synchronized (getEndpoint().getLock())
+ {
+ try
+ {
+ for(Receiver r : _receivers)
+ {
+ Message m = r.receive(false);
+ if(m != null)
+ return m;
+ }
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ return null;
+ }
+
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+}
diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java new file mode 100644 index 0000000000..a379463710 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Transaction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.type.Binary; + +public class Transaction +{ + private TransactionController _transactionController; + private Binary _txnId; + + Transaction(final TransactionController transactionController, Binary txnId) + { + _transactionController = transactionController; + _txnId = txnId; + } + + public void commit() + { + _transactionController.commit(this); + } + + public void rollback() + { + _transactionController.rollback(this); + } + + public Binary getTxnId() + { + return _txnId; + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java new file mode 100644 index 0000000000..9f2c76bc72 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransactionController.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; +import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.transaction.Declare; +import org.apache.qpid.amqp_1_0.type.transaction.Declared; +import org.apache.qpid.amqp_1_0.type.transaction.Discharge; +import org.apache.qpid.amqp_1_0.type.transport.Transfer; + + +public class TransactionController implements DeliveryStateHandler +{ + private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 0}); + private SendingLinkEndpoint _endpoint; + private Session _session; + private volatile DeliveryState _state; + private boolean _received; + + public TransactionController(Session session, SendingLinkEndpoint tcLinkEndpoint) + { + _session = session; + _endpoint = tcLinkEndpoint; + _endpoint.setDeliveryStateHandler(this); + } + + public Transaction beginTransaction() + { + + + Binary txnId = declare(); + return new Transaction(this, txnId); + } + + private Binary declare() + { + SectionEncoder encoder = _session.getSectionEncoder(); + + + AmqpValue section = new AmqpValue(new Declare()); + + + Transfer transfer = new Transfer(); + transfer.setPayload(section.encode(encoder).asByteBuffer()); + transfer.setDeliveryTag(DELIVERY_TAG); + transfer.setSettled(Boolean.FALSE); + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + while(!_endpoint.hasCreditToSend()) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + + } + } + _state = null; + _received = false; + _endpoint.transfer(transfer); + + //TODO - rationalise sending of flows + // _endpoint.sendFlow(); + } + synchronized (this) + { + while(!_received) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + + + return ((Declared) _state).getTxnId(); + } + + + public void commit(final Transaction transaction) + { + discharge(transaction.getTxnId(), false); + } + + public void rollback(final Transaction transaction) + { + discharge(transaction.getTxnId(), true); + } + + private void discharge(final Binary txnId, final boolean fail) + { + Discharge discharge = new Discharge(); + discharge.setTxnId(txnId); + discharge.setFail(fail); + SectionEncoder encoder = _session.getSectionEncoder(); + + + AmqpValue section = new AmqpValue(discharge); + + Transfer transfer = new Transfer(); + transfer.setPayload(section.encode(encoder).asByteBuffer()); + transfer.setDeliveryTag(DELIVERY_TAG); + transfer.setSettled(Boolean.FALSE); + + final Object lock = _endpoint.getLock(); + synchronized(lock) + { + while(!_endpoint.hasCreditToSend()) + { + try + { + lock.wait(); + } + catch (InterruptedException e) + { + + } + } + _state = null; + _received = false; + _endpoint.transfer(transfer); + + //TODO - rationalise sending of flows + // _endpoint.sendFlow(); + } + synchronized (this) + { + while(!_received) + { + try + { + wait(); + } + catch (InterruptedException e) + { + + } + } + } + + + } + + public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled) + { + synchronized(this) + { + _state = state; + _received = true; + + if(!Boolean.TRUE.equals(settled)) + { + _endpoint.updateDisposition(deliveryTag, state, true); + } + + notifyAll(); + } + } + + public void close() + { + _endpoint.close(); + } +} diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java new file mode 100644 index 0000000000..6fe2a6d510 --- /dev/null +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java @@ -0,0 +1,529 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.client; + +import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.commons.cli.*; + +import java.util.logging.*; + +public abstract class Util +{ + + private static final Logger FRAME_LOGGER = Logger.getLogger("FRM"); + private static final Logger RAW_LOGGER = Logger.getLogger("RAW"); + private String _host; + private String _username; + private String _password; + private int _port; + private int _count; + private boolean _useStdIn; + private boolean _useTran; + private String[] _args; + private AcknowledgeMode _mode; + private boolean _block; + private int _frameSize; + private int _messageSize; + private String _responseQueue; + private int _batchSize; + private double _rollbackRatio; + private String _linkName; + private String _containerName; + private boolean _durableLink; + private boolean _useMultipleConnections; + private int _windowSize = 100; + private String _subject; + private String _filter; + private String _remoteHost; + private boolean _useSSL; + + protected Util(String[] args) + { + CommandLineParser cmdLineParse = new PosixParser(); + + Options options = new Options(); + options.addOption("h","help",false,"show this help message and exit"); + options.addOption(OptionBuilder.withLongOpt("host") + .withDescription( "host to connect to (default 0.0.0.0)" ) + .hasArg(true) + .withArgName("HOST") + .create('H')); + options.addOption(OptionBuilder.withLongOpt("username") + .withDescription( "username to use for authentication" ) + .hasArg(true) + .withArgName("USERNAME") + .create('u')); + options.addOption(OptionBuilder.withLongOpt("password") + .withDescription( "password to use for authentication" ) + .hasArg(true) + .withArgName("PASSWORD") + .create('w')); + options.addOption(OptionBuilder.withLongOpt("port") + .withDescription( "port to connect to (default 5672)" ) + .hasArg(true) + .withArgName("PORT") + .create('p')); + options.addOption(OptionBuilder.withLongOpt("frame-size") + .withDescription( "specify the maximum frame size" ) + .hasArg(true) + .withArgName("FRAME_SIZE") + .create('f')); + options.addOption(OptionBuilder.withLongOpt("container-name") + .withDescription( "Container name" ) + .hasArg(true) + .withArgName("CONTAINER_NAME") + .create('C')); + + options.addOption(OptionBuilder.withLongOpt("ssl") + .withDescription("Use SSL") + .create('S')); + + options.addOption(OptionBuilder.withLongOpt("remote-hostname") + .withDescription( "hostname to supply in the open frame" ) + .hasArg(true) + .withArgName("HOST") + .create('O')); + + if(hasBlockOption()) + options.addOption(OptionBuilder.withLongOpt("block") + .withDescription("block until messages arrive") + .create('b')); + + if(hasCountOption()) + options.addOption(OptionBuilder.withLongOpt("count") + .withDescription( "number of messages to send (default 1)" ) + .hasArg(true) + .withArgName("COUNT") + .create('c')); + if(hasModeOption()) + options.addOption(OptionBuilder.withLongOpt("acknowledge-mode") + .withDescription( "acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once" ) + .hasArg(true) + .withArgName("MODE") + .create('k')); + + if(hasSubjectOption()) + options.addOption(OptionBuilder.withLongOpt("subject") + .withDescription( "subject message property" ) + .hasArg(true) + .withArgName("SUBJECT") + .create('s')); + + + if(hasSingleLinkPerConnectionMode()) + options.addOption(OptionBuilder.withLongOpt("single-link-per-connection") + .withDescription("acknowledgement mode: AMO|ALO|EO (At Least Once, At Most Once, Exactly Once") + .hasArg(false) + .create('Z')); + + if(hasFilterOption()) + options.addOption(OptionBuilder.withLongOpt("filter") + .withDescription("filter, e.g. exact-subject=hello; matching-subject=%.a.#") + .hasArg(true) + .withArgName("<TYPE>=<VALUE>") + .create('F')); + + + if(hasTxnOption()) + { + options.addOption("x","txn",false,"use transactions"); + options.addOption(OptionBuilder.withLongOpt("batch-size") + .withDescription( "transaction batch size (default: 1)" ) + .hasArg(true) + .withArgName("BATCH-SIZE") + .create('B')); + options.addOption(OptionBuilder.withLongOpt("rollback-ratio") + .withDescription( "rollback ratio - must be between 0 and 1 (default: 0)" ) + .hasArg(true) + .withArgName("RATIO") + .create('R')); + } + + if(hasLinkDurableOption()) + { + options.addOption("d","durable-link",false,"use a durable link"); + } + + if(hasStdInOption()) + options.addOption("i","stdin",false,"read messages from stdin (one message per line)"); + + options.addOption(OptionBuilder.withLongOpt("trace") + .withDescription("trace logging specified categories: RAW, FRM") + .hasArg(true) + .withArgName("TRACE") + .create('t')); + if(hasSizeOption()) + options.addOption(OptionBuilder.withLongOpt("message-size") + .withDescription( "size to pad outgoing messages to" ) + .hasArg(true) + .withArgName("SIZE") + .create('z')); + + if(hasResponseQueueOption()) + options.addOption(OptionBuilder.withLongOpt("response-queue") + .withDescription( "response queue to reply to" ) + .hasArg(true) + .withArgName("RESPONSE_QUEUE") + .create('r')); + + if(hasLinkNameOption()) + { + options.addOption(OptionBuilder.withLongOpt("link") + .withDescription( "link name" ) + .hasArg(true) + .withArgName("LINK") + .create('l')); + } + + if(hasWindowSizeOption()) + { + options.addOption(OptionBuilder.withLongOpt("window-size") + .withDescription("credit window size") + .hasArg(true) + .withArgName("WINDOW-SIZE") + .create('W')); + } + + CommandLine cmdLine = null; + try + { + cmdLine = cmdLineParse.parse(options, args); + + } + catch (ParseException e) + { + printUsage(options); + System.exit(-1); + } + + if(cmdLine.hasOption('h') || cmdLine.getArgList().isEmpty()) + { + printUsage(options); + System.exit(0); + } + _host = cmdLine.getOptionValue('H',"0.0.0.0"); + _remoteHost = cmdLine.getOptionValue('O',null); + String portStr = cmdLine.getOptionValue('p',"5672"); + String countStr = cmdLine.getOptionValue('c',"1"); + + _useSSL = cmdLine.hasOption('S'); + + if(hasWindowSizeOption()) + { + String windowSizeStr = cmdLine.getOptionValue('W',"100"); + _windowSize = Integer.parseInt(windowSizeStr); + } + + if(hasSubjectOption()) + { + _subject = cmdLine.getOptionValue('s'); + } + + if(cmdLine.hasOption('u')) + { + _username = cmdLine.getOptionValue('u'); + } + + if(cmdLine.hasOption('w')) + { + _password = cmdLine.getOptionValue('w'); + } + + if(cmdLine.hasOption('F')) + { + _filter = cmdLine.getOptionValue('F'); + } + + _port = Integer.parseInt(portStr); + + _containerName = cmdLine.getOptionValue('C'); + + if(hasBlockOption()) + _block = cmdLine.hasOption('b'); + + if(hasLinkNameOption()) + _linkName = cmdLine.getOptionValue('l'); + + + if(hasLinkDurableOption()) + _durableLink = cmdLine.hasOption('d'); + + if(hasCountOption()) + _count = Integer.parseInt(countStr); + + if(hasStdInOption()) + _useStdIn = cmdLine.hasOption('i'); + + if(hasSingleLinkPerConnectionMode()) + _useMultipleConnections = cmdLine.hasOption('Z'); + + if(hasTxnOption()) + { + _useTran = cmdLine.hasOption('x'); + _batchSize = Integer.parseInt(cmdLine.getOptionValue('B',"1")); + _rollbackRatio = Double.parseDouble(cmdLine.getOptionValue('R',"0")); + } + + if(hasModeOption()) + { + _mode = AcknowledgeMode.ALO; + + if(cmdLine.hasOption('k')) + { + _mode = AcknowledgeMode.valueOf(cmdLine.getOptionValue('k')); + } + } + + if(hasResponseQueueOption()) + { + _responseQueue = cmdLine.getOptionValue('r'); + } + + _frameSize = Integer.parseInt(cmdLine.getOptionValue('f',"65536")); + + if(hasSizeOption()) + { + _messageSize = Integer.parseInt(cmdLine.getOptionValue('z',"-1")); + } + + String categoriesList = cmdLine.getOptionValue('t'); + String[]categories = categoriesList == null ? new String[0] : categoriesList.split("[, ]"); + for(String cat : categories) + { + if(cat.equalsIgnoreCase("FRM")) + { + FRAME_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " FRM]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : FRAME_LOGGER.getHandlers()) + { + FRAME_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + FRAME_LOGGER.addHandler(handler); + } + else if (cat.equalsIgnoreCase("RAW")) + { + RAW_LOGGER.setLevel(Level.FINE); + Formatter formatter = new Formatter() + { + @Override + public String format(final LogRecord record) + { + return "[" + record.getMillis() + " RAW]\t" + record.getMessage() + "\n"; + } + }; + for(Handler handler : RAW_LOGGER.getHandlers()) + { + RAW_LOGGER.removeHandler(handler); + } + Handler handler = new ConsoleHandler(); + handler.setLevel(Level.FINE); + handler.setFormatter(formatter); + RAW_LOGGER.addHandler(handler); + } + } + + + _args = cmdLine.getArgs(); + + } + + protected boolean hasFilterOption() + { + return false; + } + + protected boolean hasSubjectOption() + { + return false; + } + + protected boolean hasWindowSizeOption() + { + return false; + } + + protected boolean hasSingleLinkPerConnectionMode() + { + return false; + } + + protected abstract boolean hasLinkDurableOption(); + + protected abstract boolean hasLinkNameOption(); + + protected abstract boolean hasResponseQueueOption(); + + protected abstract boolean hasSizeOption(); + + protected abstract boolean hasBlockOption(); + + protected abstract boolean hasStdInOption(); + + protected abstract boolean hasTxnOption(); + + protected abstract boolean hasModeOption(); + + protected abstract boolean hasCountOption(); + + public String getHost() + { + return _host; + } + + public String getUsername() + { + return _username; + } + + public String getPassword() + { + return _password; + } + + public int getPort() + { + return _port; + } + + public int getCount() + { + return _count; + } + + public boolean useStdIn() + { + return _useStdIn; + } + + public boolean useTran() + { + return _useTran; + } + + public AcknowledgeMode getMode() + { + return _mode; + } + + public boolean isBlock() + { + return _block; + } + + public String[] getArgs() + { + return _args; + } + + public int getMessageSize() + { + return _messageSize; + } + + public String getResponseQueue() + { + return _responseQueue; + } + + public int getBatchSize() + { + return _batchSize; + } + + public double getRollbackRatio() + { + return _rollbackRatio; + } + + public String getLinkName() + { + return _linkName; + } + + public boolean isDurableLink() + { + return _durableLink; + } + + public boolean isUseMultipleConnections() + { + return _useMultipleConnections; + } + + public void setUseMultipleConnections(boolean useMultipleConnections) + { + _useMultipleConnections = useMultipleConnections; + } + + public String getSubject() + { + return _subject; + } + + public void setSubject(String subject) + { + _subject = subject; + } + + protected abstract void printUsage(final Options options); + + protected abstract void run(); + + + public Connection newConnection() throws Connection.ConnectionException + { + Container container = getContainerName() == null ? new Container() : new Container(getContainerName()); + return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container, + _remoteHost == null ? getHost() : _remoteHost, _useSSL) + : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize, + container, _remoteHost == null ? getHost() : _remoteHost, _useSSL); + } + + public String getContainerName() + { + return _containerName; + } + + public int getWindowSize() + { + return _windowSize; + } + + public void setWindowSize(int windowSize) + { + _windowSize = windowSize; + } + + public String getFilter() + { + return _filter; + } +} |
