diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-16 16:02:26 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-16 16:02:26 +0000 |
| commit | 72de13352dc9c42acfe95a1d76f049c507eb5cfd (patch) | |
| tree | 7d0a55ab52398bb6de7139cf77ea085b8f6f3edc /java | |
| parent | 40eba25f51c024d7e10cf1b5e1d9f0110feddcdf (diff) | |
| download | qpid-python-72de13352dc9c42acfe95a1d76f049c507eb5cfd.tar.gz | |
Additions to allow refs to be sent from broker to client. Also some tidy-up.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508460 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
10 files changed, 109 insertions, 142 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index c5d0220989..b2d4215bd0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -309,33 +309,6 @@ public class AMQChannel _returnMessages.add(e); } } -// -// public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag) -// { -// deliver(msg, destination, new AMQMethodListener() -// { -// public boolean methodReceived(AMQMethodEvent evt) throws AMQException -// { -// AMQMethodBody method = evt.getMethod(); -// if (_log.isDebugEnabled()) -// { -// _log.debug(method + " received on channel " + _channelId); -// } -// // XXX: multiple? -// if (method instanceof MessageOkBody) -// { -// acknowledgeMessage(deliveryTag, false); -// return true; -// } -// else -// { -// // TODO: implement reject -// return false; -// } -// } -// public void error(Exception e) {} -// }); -// } public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag) { @@ -343,7 +316,6 @@ public class AMQChannel long maxFrameSize = _session.getFrameMax(); Iterable<ByteBuffer> contentItr = msg.getContents(); if (msg.getSize() > maxFrameSize) - //if(true) { Iterator<ByteBuffer> cItr = contentItr.iterator(); if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size @@ -410,34 +382,29 @@ public class AMQChannel public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag) { final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes(); - deliverRef(refId, msg, destination, _session.getStateManager()); -// AMQMethodBody openBody = MessageOpenBody.createMethodBody( -// _session.getProtocolMajorVersion(), // AMQP major version -// _session.getProtocolMinorVersion(), // AMQP minor version -// refId); -// _session.writeRequest(_channelId, openBody, new AMQMethodListener() -// { -// public boolean methodReceived(AMQMethodEvent evt) throws AMQException -// { -// AMQMethodBody method = evt.getMethod(); -// if (_log.isDebugEnabled()) -// { -// _log.debug(method + " received on channel " + _channelId); -// } -// if (method instanceof MessageOkBody) -// { -// acknowledgeMessage(deliveryTag, false); -// deliverRef(refId, msg, destination, _session.getStateManager()); -// return true; -// } -// else -// { -// // TODO: implement reject -// return false; -// } -// } -// public void error(Exception e) {} -// }); + deliverRef(refId, msg, destination, new AMQMethodListener() + { + public boolean methodReceived(AMQMethodEvent evt) throws AMQException + { + AMQMethodBody method = evt.getMethod(); + if (_log.isDebugEnabled()) + { + _log.debug(method + " received on channel " + _channelId); + } + // XXX: multiple? + if (method instanceof MessageOkBody) + { + acknowledgeMessage(deliveryTag, false); + return true; + } + else + { + // TODO: implement reject + return false; + } + } + public void error(Exception e) {} + }); } public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener) @@ -471,58 +438,6 @@ public class AMQChannel _session.writeRequest(_channelId, closeBody, listener); } -// protected void route(AMQMessage msg) throws AMQException -// { -// if (isTransactional()) -// { -// //don't create a transaction unless needed -// if (msg.isPersistent()) -// { -// // _txnBuffer.containsPersistentChanges(); -// } -// -// //A publication will result in the enlisting of several -// //TxnOps. The first is an op that will store the message. -// //Following that (and ordering is important), an op will -// //be added for every queue onto which the message is -// //enqueued. Finally a cleanup op will be added to decrement -// //the reference associated with the routing. -// // Store storeOp = new Store(msg); -// // _txnBuffer.enlist(storeOp); -// // msg.setTxnBuffer(_txnBuffer); -// try -// { -// _exchanges.routeContent(msg); -// // _txnBuffer.enlist(new Cleanup(msg)); -// } -// catch (RequiredDeliveryException e) -// { -// //Can only be due to the mandatory flag, as no attempt -// //has yet been made to deliver the message. The -// //message will thus not have been delivered to any -// //queue so we can return the message (without killing -// //the transaction) and for efficiency remove the store -// //operation from the buffer. -// // _txnBuffer.cancel(storeOp); -// throw e; -// } -// } -// else -// { -// try -// { -// _exchanges.routeContent(msg); -// //following check implements the functionality -// //required by the 'immediate' flag: -// msg.checkDeliveredToConsumer(); -// } -// finally -// { -// msg.decrementReference(_storeContext); -// } -// } -// } - public RequestManager getRequestManager() { return _requestManager; diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index 9743ff713b..e2aff8cd22 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -46,6 +46,12 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< { AMQProtocolSession session = stateManager.getProtocolSession(); _logger.info("Received Connection-close-ok"); + // We wait for the Mina library to close the connection, which will happen when + // the client closes the Mina connection, causing AMQFastProtocolHand.sessionClosed() + // to be called. + // TODO - Find a better way of doing this without holding up this thread... + try { Thread.currentThread().sleep(2000); } // 2 seconds + catch (InterruptedException e) {} session.closeSession(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 4ae80480aa..b7ad8790f5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; @@ -120,7 +121,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar _log.info("Queue " + body.queue + " exists and is accesible to this connection [owner=" + queue.getOwner() +"]"); } //set this as the default queue on the channel: - session.getChannel(evt.getChannelId()).setDefaultQueue(queue); + AMQChannel channel = session.getChannel(evt.getChannelId()); + if (channel == null) + throw new AMQException("Attempt to write to non-existent channel " + evt.getChannelId() + ": " + body); + channel.setDefaultQueue(queue); + //session.getChannel(evt.getChannelId()).setDefaultQueue(queue); } if (!body.nowait) diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 2c65fc02da..58461e8f1b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -375,6 +375,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { checkMethodBodyVersion(methodBody); AMQChannel channel = getChannel(channelNum); + if (channel == null) + { + throw new RuntimeException("Attempt to write to non-existent channel " + channelNum + " method=" +methodBody); + } RequestManager requestManager = channel.getRequestManager(); return requestManager.sendRequest(methodBody, methodListener); } @@ -389,10 +393,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { checkMethodBodyVersion(methodBody); AMQChannel channel = getChannel(channelNum); + if (channel == null) + { + throw new RuntimeException("Attempt to write to non-existent channel " + channelNum + ": reqId=" + requestId + " method=" + methodBody); + } ResponseManager responseManager = channel.getResponseManager(); - try { + try + { responseManager.sendResponse(requestId, methodBody); - } catch (RequestResponseMappingException e) { + } + catch (RequestResponseMappingException e) + { throw new RuntimeException(e); } } @@ -599,7 +610,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, task.doTask(this); } } - _minaProtocolSession.close(); +// gsim-python +// _minaProtocolSession.close(); } /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 073d81493e..4571da502e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -163,7 +163,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco else { _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); + // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right! + // gsim-python //session.closeSessionRequest(200, new AMQShortString(throwable.getMessage())); session.closeSession(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java index 4178f48cf5..235ddaacb9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java @@ -24,7 +24,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.log4j.Logger; @@ -47,7 +49,12 @@ public class MessageAppendMethodHandler implements StateAwareMethodListener try { protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod()); - System.out.println("Message.appened()-->Appending message content to body"); + + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody( + protocolSession.getProtocolMajorVersion(), // AMQP major version + protocolSession.getProtocolMinorVersion()); // AMQP minor version + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } catch (Exception e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java index 284b7444d2..62fde4d806 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java @@ -24,7 +24,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.log4j.Logger; @@ -46,10 +48,15 @@ public class MessageCloseMethodHandler implements StateAwareMethodListener { MessageCloseBody body = (MessageCloseBody)evt.getMethod(); String referenceId = new String(body.getReference()); - System.out.println("Message.closing()-->Handing message to session"); protocolSession.deliverMessageToAMQSession(evt.getChannelId(), referenceId); _logger.debug("Method Close Body received, notify session to accept unprocessed message"); + + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody( + protocolSession.getProtocolMajorVersion(), // AMQP major version + protocolSession.getProtocolMinorVersion()); // AMQP minor version + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java index 06d9c9ff99..8de85accdf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/MessageOpenMethodHandler.java @@ -25,7 +25,9 @@ import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQMethodEvent; //import org.apache.log4j.Logger; @@ -45,11 +47,15 @@ public class MessageOpenMethodHandler implements StateAwareMethodListener public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), null, false); - String referenceId = new String(((MessageOpenBody)evt.getMethod()).getReference()); - protocolSession.unprocessedMessageReceived(referenceId, msg); - - System.out.println("Message.open()-->Adding message to map with ref"); + byte[] referenceId = ((MessageOpenBody)evt.getMethod()).getReference(); + final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), referenceId); + protocolSession.unprocessedMessageReceived(new String(referenceId), msg); + + // Be aware of possible changes to parameter order as versions change. + final AMQMethodBody methodBody = MessageOkBody.createMethodBody( + protocolSession.getProtocolMajorVersion(), // AMQP major version + protocolSession.getProtocolMinorVersion()); // AMQP minor version + protocolSession.writeResponse(evt.getChannelId(), evt.getRequestId(), methodBody); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java index f242812d3b..3e5efd9068 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java @@ -35,11 +35,19 @@ public class UnprocessedMessage { private int bytesReceived = 0; private int channelId; + private byte[] referenceId; private List<byte[]> contents = new LinkedList(); private long deliveryTag; private boolean redeliveredFlag; private MessageHeaders messageHeaders; + public UnprocessedMessage(int channelId, long deliveryTag, byte[] referenceId) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.referenceId = referenceId; + } + public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) { this.channelId = channelId; @@ -73,6 +81,11 @@ public class UnprocessedMessage return channelId; } + public byte[] getReferenceId() + { + return referenceId; + } + public List<byte[]> getContents() { return contents; @@ -95,7 +108,9 @@ public class UnprocessedMessage public String toString() { - return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0)); + return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + + new String(contents.get(0)); } public void setMessageHeaders(MessageHeaders messageHeaders) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java index 5c7f249107..27a312f3df 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PubSubTwoConnectionTest.java @@ -65,27 +65,19 @@ public class PubSubTwoConnectionTest extends TestCase assertEquals("Hello", tm1.getText()); } - public static void main(String[] args){ - PubSubTwoConnectionTest test = new PubSubTwoConnectionTest(); - try { - //test.setUp(); - //test.testTwoConnections(); - int a = 5; - - System.out.println(a++); - System.out.println(a); - System.out.println(++a); - - int b = ++a; - int c = a++; - - System.out.println(b); - System.out.println(c); - System.out.println(a); - - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + public static void main(String[] args) + { + PubSubTwoConnectionTest test = new PubSubTwoConnectionTest(); + try + { + test.setUp(); + test.testTwoConnections(); + test.tearDown(); + } + catch (Exception e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } } } |
