summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-11 10:19:17 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-11 10:19:17 +0000
commit6183b2736fae22b8bafb509e37386fa7a037c5f3 (patch)
tree00b68f41ebc7fa1ce306c177fc779a8819299e0a /qpid/java/broker-plugins
parentacf22a677693d9b0dfcdfdd8e8340af8cacbcb3c (diff)
downloadqpid-python-6183b2736fae22b8bafb509e37386fa7a037c5f3.tar.gz
QPID-5504 : refactring of queues, and introduce management node and amqp-management module
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1567026 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java156
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java84
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java271
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java268
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java148
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java6
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java140
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java151
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java281
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java27
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter20
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java2
-rw-r--r--qpid/java/broker-plugins/management-amqp/build.xml32
-rw-r--r--qpid/java/broker-plugins/management-amqp/pom.xml48
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java73
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java1402
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java242
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java39
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java220
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator19
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java3
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js4
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html2
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java2
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java2
32 files changed, 3580 insertions, 118 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
new file mode 100644
index 0000000000..37bbd810b4
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage>
+{
+ @Override
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getOutputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public MessageTransferMessage convert(InternalMessage serverMsg, VirtualHost vhost)
+ {
+ return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg)
+ {
+ final byte[] messageContent = MessageConverter_v0_10.convertToBody(serverMsg.getMessageBody());
+ final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
+ MessageConverter_v0_10.getBodyMimeType(
+ serverMsg.getMessageBody()),
+ messageContent.length);
+
+ return new StoredMessage<MessageMetaData_0_10>()
+ {
+ @Override
+ public MessageMetaData_0_10 getMetaData()
+ {
+ return messageMetaData_0_10;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ int size = messageContent.length - offsetInMessage;
+ if(dst.remaining() < size)
+ {
+ size = dst.remaining();
+ }
+ ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+
+
+ deliveryProps.setExpiration(serverMsg.getExpiration());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+ deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+ messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentLength(size);
+ messageProps.setContentType(bodyMimeType);
+ if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ }
+
+ Header header = new Header(deliveryProps, messageProps, null);
+ return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v0-10";
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 5244a7f51b..32ecc6bd0e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -20,7 +20,14 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
@@ -30,9 +37,13 @@ import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.codec.BBEncoder;
public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage>
{
+
+ public static final Charset UTF_8 = Charset.forName("UTF-8");
+
@Override
public Class<ServerMessage> getInputClass()
{
@@ -129,6 +140,79 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
}
+
+ public static byte[] convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return ((String)object).getBytes(UTF_8);
+ }
+ else if(object instanceof byte[])
+ {
+ return (byte[]) object;
+ }
+ else if(object instanceof Map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap((Map)object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+
+ }
+ else if(object instanceof List)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeList((List) object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return bytesOut.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static String getBodyMimeType(Object object)
+ {
+ if(object instanceof String)
+ {
+ return "text/plain";
+ }
+ else if(object instanceof byte[])
+ {
+ return "application/octet-stream";
+ }
+ else if(object instanceof Map)
+ {
+ return "amqp/map";
+ }
+ else if(object instanceof List)
+ {
+ return "amqp/list";
+ }
+ else
+ {
+ return "application/java-object-stream";
+ }
+ }
+
+
@Override
public String getType()
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
new file mode 100644
index 0000000000..bc5f8899f2
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -0,0 +1,271 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageMetaData;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+public class MessageConverter_v0_10_to_Internal implements MessageConverter<MessageTransferMessage, InternalMessage>
+{
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(MessageTransferMessage serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Object body = convertMessageBody(mimeType, data);
+ MessageProperties messageProps = serverMessage.getHeader().getMessageProperties();
+ AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo());
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body);
+ }
+
+ private static class DelegatingMessageHeader implements AMQMessageHeader
+ {
+ private final AMQMessageHeader _delegate;
+ private final ReplyTo _replyTo;
+
+
+ private DelegatingMessageHeader(final AMQMessageHeader delegate, final ReplyTo replyTo)
+ {
+ _delegate = delegate;
+ _replyTo = replyTo;
+ }
+
+ @Override
+ public String getCorrelationId()
+ {
+ return _delegate.getCorrelationId();
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return _delegate.getExpiration();
+ }
+
+ @Override
+ public String getUserId()
+ {
+ return _delegate.getUserId();
+ }
+
+ @Override
+ public String getAppId()
+ {
+ return _delegate.getAppId();
+ }
+
+ @Override
+ public String getMessageId()
+ {
+ return _delegate.getMessageId();
+ }
+
+ @Override
+ public String getMimeType()
+ {
+ return _delegate.getMimeType();
+ }
+
+ @Override
+ public String getEncoding()
+ {
+ return _delegate.getEncoding();
+ }
+
+ @Override
+ public byte getPriority()
+ {
+ return _delegate.getPriority();
+ }
+
+ @Override
+ public long getTimestamp()
+ {
+ return _delegate.getTimestamp();
+ }
+
+ @Override
+ public String getType()
+ {
+ return _delegate.getType();
+ }
+
+ @Override
+ public String getReplyTo()
+ {
+ return _replyTo == null
+ ? null
+ : _replyTo.getExchange() == null || _replyTo.getExchange().equals("")
+ ? _replyTo.getRoutingKey()
+ : _replyTo.getRoutingKey() == null || _replyTo.getRoutingKey().equals("")
+ ? _replyTo.getExchange()
+ : _replyTo.getExchange() + "/" + _replyTo.getRoutingKey();
+ }
+
+ @Override
+ public Object getHeader(final String name)
+ {
+ return _delegate.getHeader(name);
+ }
+
+ @Override
+ public boolean containsHeaders(final Set<String> names)
+ {
+ return _delegate.containsHeaders(names);
+ }
+
+ @Override
+ public boolean containsHeader(final String name)
+ {
+ return _delegate.containsHeader(name);
+ }
+
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return _delegate.getHeaderNames();
+ }
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-10 to Internal";
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 53022c333e..87a02b99c1 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -928,10 +928,10 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
- private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
index 995b0fabdc..dd115905a4 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -17,3 +17,5 @@
# under the License.
#
org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_Internal_to_v0_10
+org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10_to_Internal
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 7e712c8e17..8becdf853b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1202,14 +1202,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
public ImmediateAction()
{
}
- public void performAction(MessageInstance<C> entry)
+ public void performAction(MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
@@ -1274,10 +1274,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void performAction(final MessageInstance<C> entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
TransactionLogResource queue = entry.getOwningResource();
if(queue instanceof CapacityChecker)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
new file mode 100644
index 0000000000..b80ad3e7b8
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBEncoder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v0_8 implements MessageConverter<InternalMessage, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(InternalMessage serverMsg, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg)
+ {
+ final byte[] messageContent = convertToBody(serverMsg.getMessageBody());
+ final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
+ getBodyMimeType(serverMsg.getMessageBody()),
+ messageContent.length);
+
+ return new StoredMessage<MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return messageMetaData_0_8;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ int size = messageContent.length - offsetInMessage;
+ if(dst.remaining() < size)
+ {
+ size = dst.remaining();
+ }
+ ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size)
+ {
+
+ MessagePublishInfo publishInfo = new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public void setExchange(final AMQShortString amqShortString)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setAppId(serverMsg.getMessageHeader().getAppId());
+ props.setContentType(bodyMimeType);
+ props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId());
+ props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT);
+ props.setExpiration(serverMsg.getExpiration());
+ props.setMessageId(serverMsg.getMessageHeader().getMessageId());
+ props.setPriority(serverMsg.getMessageHeader().getPriority());
+ props.setReplyTo(serverMsg.getMessageHeader().getReplyTo());
+ props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ props.setUserId(serverMsg.getMessageHeader().getUserId());
+
+ Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
+
+ for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
+ {
+ headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName));
+ }
+
+ props.setHeaders(FieldTable.convertToFieldTable(headerProps));
+
+ final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v0-8";
+ }
+
+
+ public static byte[] convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return ((String)object).getBytes(UTF_8);
+ }
+ else if(object instanceof byte[])
+ {
+ return (byte[]) object;
+ }
+ else if(object instanceof Map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap((Map)object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+
+ }
+ else if(object instanceof List)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeList((List) object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return bytesOut.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static String getBodyMimeType(Object object)
+ {
+ if(object instanceof String)
+ {
+ return "text/plain";
+ }
+ else if(object instanceof byte[])
+ {
+ return "application/octet-stream";
+ }
+ else if(object instanceof Map)
+ {
+ return "amqp/map";
+ }
+ else if(object instanceof List)
+ {
+ return "amqp/list";
+ }
+ else
+ {
+ return "application/java-object-stream";
+ }
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
new file mode 100644
index 0000000000..6076ff66c7
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -0,0 +1,148 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(AMQMessage serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Object body = convertMessageBody(mimeType, data);
+
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body);
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to Internal";
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
new file mode 100644
index 0000000000..d87bc2566f
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.protocol.v0_8.MessageConverter_Internal_to_v0_8
+org.apache.qpid.server.protocol.v0_8.MessageConverter_v0_8_to_Internal \ No newline at end of file
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
index dded0c70fe..f47525097e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -36,7 +36,7 @@ import java.util.List;
public class AcknowledgeTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -79,7 +79,7 @@ public class AcknowledgeTest extends QpidTestCase
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
index f6376e56c4..dc687e1075 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
@@ -26,10 +26,8 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -39,7 +37,7 @@ import java.util.List;
public class QueueBrowserUsesNoAckTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
new file mode 100644
index 0000000000..f02908391a
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.message.internal.InternalMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage>
+{
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+
+ public Class<InternalMessage> getInputClass()
+ {
+ return InternalMessage.class;
+ }
+
+
+ @Override
+ protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage,
+ final SectionEncoder sectionEncoder)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+ header.setPriority(UnsignedByte.valueOf(serverMessage.getMessageHeader().getPriority()));
+ if(serverMessage.getExpiration() != 0l && serverMessage.getArrivalTime() !=0l && serverMessage.getExpiration() >= serverMessage.getArrivalTime())
+ {
+ header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime()));
+ }
+
+ sections.add(header);
+
+ Properties properties = new Properties();
+ properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId());
+ properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp()));
+ properties.setMessageId(serverMessage.getMessageHeader().getMessageId());
+ final String userId = serverMessage.getMessageHeader().getUserId();
+ if(userId != null)
+ {
+ properties.setUserId(new Binary(userId.getBytes(UTF_8)));
+ }
+ properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo());
+
+ sections.add(properties);
+
+ if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty())
+ {
+ ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() );
+ sections.add(applicationProperties);
+ }
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+
+ }
+
+ protected Section getBodySection(final InternalMessage serverMessage, final String mimeType)
+ {
+ return convertToBody(serverMessage.getMessageBody());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "Internal to v1-0";
+ }
+
+
+ public Section convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return new AmqpValue(object);
+ }
+ else if(object instanceof byte[])
+ {
+ return new Data(new Binary((byte[])object));
+ }
+ else if(object instanceof Map)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object));
+ }
+ else if(object instanceof List)
+ {
+ return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object));
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return new Data(new Binary(bytesOut.toByteArray()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index a96d951de6..a8a203b247 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -156,7 +156,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
}
- private static Map fixMapValues(final Map<String, Object> map)
+ static Map fixMapValues(final Map<String, Object> map)
{
for(Map.Entry<String,Object> entry : map.entrySet())
{
@@ -165,7 +165,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
return map;
}
- private static Object fixValue(final Object value)
+ static Object fixValue(final Object value)
{
if(value instanceof byte[])
{
@@ -185,7 +185,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
}
- private static List fixListValues(final List list)
+ static List fixListValues(final List list)
{
ListIterator iterator = list.listIterator();
while(iterator.hasNext())
@@ -198,83 +198,88 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final ServerMessage serverMessage,
+ final M serverMessage,
SectionEncoder sectionEncoder)
{
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ Section bodySection = getBodySection(serverMessage, mimeType);
- Section bodySection = convertMessageBody(mimeType, data);
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
- final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
- return new StoredMessage<MessageMetaData_1_0>()
- {
- @Override
- public MessageMetaData_1_0 getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMessage.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- int size;
- if(dst.remaining()<buf.remaining())
- {
- buf.limit(dst.remaining());
- size = dst.remaining();
- }
- else
+ return new StoredMessage<MessageMetaData_1_0>()
{
- size = buf.remaining();
- }
- dst.put(buf);
- return size;
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- if(size < buf.remaining())
- {
- buf.limit(size);
- }
- return buf;
- }
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
+ }
+ else
+ {
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
- @Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
+ protected Section getBodySection(final M serverMessage, final String mimeType)
+ {
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
+ return convertMessageBody(mimeType, data);
+ }
private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
new file mode 100644
index 0000000000..f639f98dba
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage>
+{
+
+ static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance();
+ static
+ {
+ TYPE_REGISTRY.registerTransportLayer();
+ TYPE_REGISTRY.registerMessagingLayer();
+ TYPE_REGISTRY.registerTransactionLayer();
+ TYPE_REGISTRY.registerSecurityLayer();
+ }
+
+ @Override
+ public Class<Message_1_0> getInputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public Class<InternalMessage> getOutputClass()
+ {
+ return InternalMessage.class;
+ }
+
+ @Override
+ public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+
+
+
+
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data));
+
+ SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY);
+
+ try
+ {
+ List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+ ListIterator<Section> iterator = sections.listIterator();
+ Section previousSection = null;
+ while(iterator.hasNext())
+ {
+ Section section = iterator.next();
+ if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence))
+ {
+ iterator.remove();
+ }
+ else
+ {
+ if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
+ {
+ throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+ }
+ else
+ {
+ previousSection = section;
+ }
+ }
+ }
+
+ Object bodyObject;
+
+ if(sections.isEmpty())
+ {
+ // should actually be illegal
+ bodyObject = new byte[0];
+ }
+ else
+ {
+ Section firstBodySection = sections.get(0);
+ if(firstBodySection instanceof AmqpValue)
+ {
+ bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
+ }
+ else if(firstBodySection instanceof Data)
+ {
+ int totalSize = 0;
+ for(Section section : sections)
+ {
+ totalSize += ((Data)section).getValue().getLength();
+ }
+ byte[] bodyData = new byte[totalSize];
+ ByteBuffer buf = ByteBuffer.wrap(bodyData);
+ for(Section section : sections)
+ {
+ buf.put(((Data)section).getValue().asByteBuffer());
+ }
+ bodyObject = bodyData;
+ }
+ else
+ {
+ ArrayList totalSequence = new ArrayList();
+ for(Section section : sections)
+ {
+ totalSequence.addAll(((AmqpSequence)section).getValue());
+ }
+ bodyObject = fixObject(totalSequence);
+ }
+ }
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
+
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
+
+
+ }
+
+ private Object fixObject(final Object value)
+ {
+ if(value instanceof Binary)
+ {
+ final Binary binaryValue = (Binary) value;
+ byte[] data = new byte[binaryValue.getLength()];
+ binaryValue.asByteBuffer().get(data);
+ return data;
+ }
+ else if(value instanceof List)
+ {
+ List listValue = (List) value;
+ List fixedValue = new ArrayList(listValue.size());
+ for(Object o : listValue)
+ {
+ fixedValue.add(fixObject(o));
+ }
+ return fixedValue;
+ }
+ else if(value instanceof Map)
+ {
+ Map<?,?> mapValue = (Map) value;
+ Map fixedValue = new LinkedHashMap(mapValue.size());
+ for(Map.Entry<?,?> entry : mapValue.entrySet())
+ {
+ fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
+ }
+ return fixedValue;
+ }
+ else
+ {
+ return value;
+ }
+
+ }
+
+ private static Object convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return text;
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return map;
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ final Map<String,Object> map = decoder.readMap();
+
+ return map;
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return decoder.readList();
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return list;
+ }
+ else
+ {
+ return data;
+
+ }
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to Internal";
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 9e0327fe76..f796a4b2e3 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -43,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
@@ -391,15 +382,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
options.add(Consumer.Option.NO_LOCAL);
}
-
- _consumer.setNoLocal(noLocal);
-
-
try
{
+ final String name;
+ if(getEndpoint().getTarget() instanceof Target)
+ {
+ Target target = (Target) getEndpoint().getTarget();
+ name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
+ }
+ else
+ {
+ name = getEndpoint().getName();
+ }
_consumer = _queue.addConsumer(_target,
messageFilter == null ? null : new SimpleFilterManager(messageFilter),
- Message_1_0.class, getEndpoint().getName(), options);
+ Message_1_0.class, name, options);
}
catch (AMQException e)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
new file mode 100644
index 0000000000..aa24847805
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.protocol.v1_0.MessageConverter_Internal_to_v1_0
+org.apache.qpid.server.protocol.v1_0.MessageConverter_v1_0_to_Internal \ No newline at end of file
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index f1843de8ac..0c83c31ad4 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
{
if(deliveryProps.hasDeliveryMode())
{
- props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+ props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
? BasicContentHeaderProperties.PERSISTENT
: BasicContentHeaderProperties.NON_PERSISTENT));
}
diff --git a/qpid/java/broker-plugins/management-amqp/build.xml b/qpid/java/broker-plugins/management-amqp/build.xml
new file mode 100644
index 0000000000..542bb952f1
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/build.xml
@@ -0,0 +1,32 @@
+<!--
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -->
+<project name="Qpid Broker-Plugins AMQP Management" default="build">
+ <property name="module.depends" value="common broker-core" />
+ <property name="module.test.depends" value="qpid-test-utils broker-core/tests" />
+
+ <property name="module.genpom" value="true"/>
+ <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/>
+
+ <property name="broker.plugin" value="true"/>
+ <property name="broker-plugins-management-amqp.libs" value="" />
+
+ <import file="../../module.xml" />
+
+ <target name="bundle" depends="bundle-tasks"/>
+</project>
diff --git a/qpid/java/broker-plugins/management-amqp/pom.xml b/qpid/java/broker-plugins/management-amqp/pom.xml
new file mode 100644
index 0000000000..83873a61f2
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>qpid-parent</artifactId>
+ <groupId>org.apache.qpid</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>qpid-broker-plugins-management-amqp</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>AMQP Management Protocol Plug-in</name>
+ <description>AMQP Management broker plug-in</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ </build>
+
+</project>
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java
new file mode 100644
index 0000000000..10a16faa56
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import java.util.Arrays;
+
+class ManagedEntityType
+{
+ private final String _name;
+ private final ManagedEntityType[] _parents;
+ private final String[] _attributes;
+ private final String[] _operations;
+
+ ManagedEntityType(final String name,
+ final ManagedEntityType[] parents,
+ final String[] attributes,
+ final String[] operations)
+ {
+ _name = name;
+ _parents = parents;
+ _attributes = attributes;
+ _operations = operations;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public ManagedEntityType[] getParents()
+ {
+ return _parents;
+ }
+
+ public String[] getAttributes()
+ {
+ return _attributes;
+ }
+
+ public String[] getOperations()
+ {
+ return _operations;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ManagedEntityType{" +
+ "name='" + _name + '\'' +
+ ", parents=" + Arrays.toString(_parents) +
+ ", attributes=" + Arrays.toString(_attributes) +
+ ", operations=" + Arrays.toString(_operations) +
+ '}';
+ }
+}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
new file mode 100644
index 0000000000..1c1c72dd0b
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -0,0 +1,1402 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.message.internal.InternalMessageHeader;
+import org.apache.qpid.server.model.AmqpManagement;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.charset.Charset;
+import java.security.AccessControlException;
+import java.text.MessageFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementNode>, MessageDestination
+{
+
+ public static final String NAME_ATTRIBUTE = "name";
+ public static final String IDENTITY_ATTRIBUTE = "identity";
+ public static final String TYPE_ATTRIBUTE = "type";
+ public static final String OPERATION_HEADER = "operation";
+ public static final String SELF_NODE_NAME = "self";
+ public static final String MANAGEMENT_TYPE = "org.amqp.management";
+ public static final String GET_TYPES = "GET-TYPES";
+ public static final String GET_ATTRIBUTES = "GET-ATTRIBUTES";
+ public static final String GET_OPERATIONS = "GET-OPERATIONS";
+ public static final String QUERY = "QUERY";
+ public static final String ENTITY_TYPES_HEADER = "entityTypes";
+ public static final String STATUS_CODE_HEADER = "statusCode";
+ public static final int STATUS_CODE_OK = 200;
+ public static final String ATTRIBUTES_HEADER = "attributes";
+ public static final String OFFSET_HEADER = "offset";
+ public static final String COUNT_HEADER = "count";
+ public static final String MANAGEMENT_NODE_NAME = "$management";
+ public static final String CREATE_OPERATION = "CREATE";
+ public static final String READ_OPERATION = "READ";
+ public static final String UPDATE_OPERATION = "UPDATE";
+ public static final String DELETE_OPERATION = "DELETE";
+ public static final String STATUS_DESCRIPTION_HEADER = "statusDescription";
+ public static final int NOT_FOUND_STATUS_CODE = 404;
+ public static final int NOT_IMPLEMENTED_STATUS_CODE = 501;
+ public static final int STATUS_CODE_NO_CONTENT = 204;
+ public static final int STATUS_CODE_FORBIDDEN = 403;
+ public static final int STATUS_CODE_BAD_REQUEST = 400;
+ public static final int STATUS_CODE_INTERNAL_ERROR = 500;
+
+
+ private final VirtualHost _virtualHost;
+
+ private final UUID _id;
+
+ private final CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>> _consumerRegistrationListeners =
+ new CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>>();
+
+ private final SystemNodeCreator.SystemNodeRegistry _registry;
+ private final ConfiguredObject _managedObject;
+ private Map<String, ManagementNodeConsumer> _consumers = new ConcurrentHashMap<String, ManagementNodeConsumer>();
+
+ private Map<String,ManagedEntityType> _entityTypes = Collections.synchronizedMap(new LinkedHashMap<String, ManagedEntityType>());
+
+ private Map<ManagedEntityType,Map<String,ConfiguredObject>> _entities = Collections.synchronizedMap(new LinkedHashMap<ManagedEntityType,Map<String,ConfiguredObject>>());
+
+
+ public ManagementNode(final SystemNodeCreator.SystemNodeRegistry registry,
+ final ConfiguredObject configuredObject)
+ {
+ _virtualHost = registry.getVirtualHost();
+ _registry = registry;
+ final String name = configuredObject.getId() + MANAGEMENT_NODE_NAME;
+ _id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset()));
+
+
+ _managedObject = configuredObject;
+
+ populateTypeMetaData(configuredObject.getClass(), false);
+
+ configuredObject.addChangeListener(new ModelObjectListener());
+
+ final Class managementClass = getManagementClass(_managedObject.getClass());
+ _entities.get(_entityTypes.get(managementClass.getName())).put(_managedObject.getName(), _managedObject);
+
+ Collection<Class<? extends ConfiguredObject>> childClasses = Model.getInstance().getChildTypes(managementClass);
+ for(Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ if(getManagementClass(childClass) != null)
+ {
+ for(ConfiguredObject child : _managedObject.getChildren(childClass))
+ {
+ _entities.get(_entityTypes.get(getManagementClass(childClass).getName())).put(child.getName(), child);
+ }
+ }
+ }
+
+ }
+
+ private Class getManagementClass(Class objectClass)
+ {
+ List<Class> allClasses = new ArrayList<Class>();
+ allClasses.add(objectClass);
+ allClasses.addAll(Arrays.asList(objectClass.getInterfaces()));
+ allClasses.add(objectClass.getSuperclass());
+ for(Class clazz : allClasses)
+ {
+ AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class);
+ if(annotation != null)
+ {
+ return clazz;
+ }
+ }
+ return null;
+ }
+
+ private boolean populateTypeMetaData(final Class<? extends ConfiguredObject> objectClass, boolean allowCreate)
+ {
+ Class clazz = getManagementClass(objectClass);
+ if( clazz != null)
+ {
+ AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class);
+ populateTypeMetaData(clazz, annotation);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private ManagedEntityType populateTypeMetaData(Class clazz,
+ final AmqpManagement entityType)
+ {
+
+ ManagedEntityType managedEntityType = _entityTypes.get(clazz.getName());
+
+ if(managedEntityType == null)
+ {
+ List<String> opsList = new ArrayList<String>(Arrays.asList(entityType.operations()));
+ if(entityType.creatable())
+ {
+ boolean isCreatableChild = false;
+ for(Class<? extends ConfiguredObject> parentConfig : Model.getInstance().getParentTypes(clazz))
+ {
+ isCreatableChild = parentConfig.isAssignableFrom(_managedObject.getClass());
+ if(isCreatableChild)
+ {
+ opsList.add(CREATE_OPERATION);
+ break;
+ }
+ }
+ }
+ opsList.addAll(Arrays.asList(READ_OPERATION, UPDATE_OPERATION, DELETE_OPERATION));
+
+ Set<ManagedEntityType> parentSet = new HashSet<ManagedEntityType>();
+
+ List<Class> allClasses = new ArrayList<Class>(Arrays.asList(clazz.getInterfaces()));
+ if(clazz.getSuperclass() != null)
+ {
+ allClasses.add(clazz.getSuperclass());
+ }
+
+ for(Class parentClazz : allClasses)
+ {
+ if(parentClazz.getAnnotation(AmqpManagement.class) != null)
+ {
+ ManagedEntityType parentType = populateTypeMetaData(parentClazz,
+ (AmqpManagement) parentClazz.getAnnotation(
+ AmqpManagement.class)
+ );
+ parentSet.add(parentType);
+ parentSet.addAll(Arrays.asList(parentType.getParents()));
+
+ }
+ }
+ managedEntityType = new ManagedEntityType(clazz.getName(), parentSet.toArray(new ManagedEntityType[parentSet.size()]), entityType.attributes(), opsList.toArray(new String[opsList.size()]));
+ _entityTypes.put(clazz.getName(),managedEntityType);
+ _entities.put(managedEntityType, Collections.synchronizedMap(new LinkedHashMap<String, ConfiguredObject>()));
+
+ if(ConfiguredObject.class.isAssignableFrom(clazz))
+ {
+ Collection<Class<? extends ConfiguredObject>> childTypes = Model.getInstance().getChildTypes(clazz);
+ for(Class<? extends ConfiguredObject> childClass : childTypes)
+ {
+ populateTypeMetaData(childClass, true);
+ }
+ }
+
+ }
+
+ return managedEntityType;
+
+ }
+
+ @Override
+ public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ {
+
+ @SuppressWarnings("unchecked")
+ MessageConverter<M, InternalMessage> converter =
+ MessageConverterRegistry.getConverter((Class<M>)message.getClass(), InternalMessage.class);
+
+ final InternalMessage msg = converter.<M>convert(message, _virtualHost);
+
+ if(validateMessage(msg))
+ {
+ txn.addPostTransactionAction(new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ enqueue(msg, instanceProperties, postEnqueueAction);
+ }
+
+ @Override
+ public void onRollback()
+ {
+
+ }
+ });
+
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ private boolean validateMessage(final ServerMessage message)
+ {
+ AMQMessageHeader header = message.getMessageHeader();
+ return containsStringHeader(header, TYPE_ATTRIBUTE) && containsStringHeader(header, OPERATION_HEADER)
+ && (containsStringHeader(header, NAME_ATTRIBUTE) || containsStringHeader(header, IDENTITY_ATTRIBUTE));
+ }
+
+ private boolean containsStringHeader(final AMQMessageHeader header, String name)
+ {
+ return header.containsHeader(name) && header.getHeader(name) instanceof String;
+ }
+
+ synchronized void enqueue(InternalMessage message, InstanceProperties properties, Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ {
+ if(postEnqueueAction != null)
+ {
+ postEnqueueAction.performAction(new ConsumedMessageInstance(message, properties));
+ }
+
+
+
+ String name = (String) message.getMessageHeader().getHeader(NAME_ATTRIBUTE);
+ String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE);
+ String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE);
+ String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER);
+
+ InternalMessage response;
+
+ if(SELF_NODE_NAME.equals(name) && type.equals(MANAGEMENT_TYPE))
+ {
+ response = performManagementOperation(message);
+ }
+ else if(CREATE_OPERATION.equals(operation))
+ {
+ response = performCreateOperation(message, type);
+ }
+ else
+ {
+
+ ConfiguredObject entity = findSubject(name, id, type);
+
+ if(entity != null)
+ {
+ response = performOperation(message, entity);
+ }
+ else
+ {
+ if(id != null)
+ {
+ response = createFailureResponse(message,
+ NOT_FOUND_STATUS_CODE,
+ "No entity with id {0} of type {1} found", id, type);
+ }
+ else
+ {
+ response = createFailureResponse(message,
+ NOT_FOUND_STATUS_CODE,
+ "No entity with name {0} of type {1} found", name, type);
+ }
+ }
+ }
+
+
+ ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo());
+ if(consumer != null)
+ {
+ // TODO - check same owner
+ consumer.send(response);
+ }
+ // TODO - route to a queue
+
+ }
+
+ private InternalMessage performCreateOperation(final InternalMessage message, final String type)
+ {
+ InternalMessage response;
+ ManagedEntityType entityType = _entityTypes.get(type);
+ if(type != null)
+ {
+ if(Arrays.asList(entityType.getOperations()).contains(CREATE_OPERATION))
+ {
+ Object messageBody = message.getMessageBody();
+ if(messageBody instanceof Map)
+ {
+ try
+ {
+
+ Class<? extends ConfiguredObject> clazz =
+ (Class<? extends ConfiguredObject>) Class.forName(type);
+ try
+ {
+ ConfiguredObject child = _managedObject.createChild(clazz, (Map) messageBody);
+ if(child == null)
+ {
+ child = _entities.get(entityType).get(message.getMessageHeader().getHeader(NAME_ATTRIBUTE));
+ }
+ response = performReadOperation(message, child);
+ }
+ catch(RuntimeException e)
+ {
+ if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException)
+ {
+ response = createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage());
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ catch (ClassNotFoundException e)
+ {
+ response = createFailureResponse(message,
+ STATUS_CODE_INTERNAL_ERROR, "Unable to instantiate an instance of {0} ", type);
+ }
+ }
+ else
+ {
+ response = createFailureResponse(message,
+ STATUS_CODE_BAD_REQUEST,
+ "The message body in the request was not of the correct type");
+ }
+ }
+ else
+ {
+ response = createFailureResponse(message,
+ STATUS_CODE_FORBIDDEN,
+ "Cannot CREATE entities of type {0}", type);
+ }
+ }
+ else
+ {
+ response = createFailureResponse(message,
+ NOT_FOUND_STATUS_CODE,
+ "Unknown type {0}",type);
+ }
+ return response;
+ }
+
+ private InternalMessage performOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ {
+ String operation = (String) requestMessage.getMessageHeader().getHeader(OPERATION_HEADER);
+
+ if(READ_OPERATION.equals(operation))
+ {
+ return performReadOperation(requestMessage, entity);
+ }
+ else if(DELETE_OPERATION.equals(operation))
+ {
+ return performDeleteOperation(requestMessage, entity);
+ }
+ else if(UPDATE_OPERATION.equals(operation))
+ {
+ return performUpdateOperation(requestMessage, entity);
+ }
+ else
+ {
+ return createFailureResponse(requestMessage, NOT_IMPLEMENTED_STATUS_CODE, "Unable to perform the {0} operation",operation);
+ }
+ }
+
+ private InternalMessage performReadOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ {
+ final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
+ responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
+ responseHeader.setHeader(STATUS_CODE_HEADER,STATUS_CODE_OK);
+ final String type = getManagementClass(entity.getClass()).getName();
+ responseHeader.setHeader(TYPE_ATTRIBUTE, type);
+
+ Map<String,Object> responseBody = new LinkedHashMap<String, Object>();
+ final ManagedEntityType entityType = _entityTypes.get(type);
+ for(String attribute : entityType.getAttributes())
+ {
+ responseBody.put(attribute, fixValue(entity.getAttribute(attribute)));
+ }
+
+ return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, responseBody);
+ }
+
+
+ private InternalMessage performDeleteOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ {
+ final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
+ responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
+ final String type = getManagementClass(entity.getClass()).getName();
+ responseHeader.setHeader(TYPE_ATTRIBUTE, type);
+ try
+ {
+ entity.setDesiredState(entity.getActualState(),State.DELETED);
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT);
+ }
+ catch(RuntimeException e)
+ {
+ if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException)
+ {
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_FORBIDDEN);
+ }
+ else
+ {
+ throw e;
+ }
+
+ }
+
+ return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, Collections.emptyMap());
+ }
+
+
+ private InternalMessage performUpdateOperation(final InternalMessage requestMessage, final ConfiguredObject entity)
+ {
+ final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName());
+ responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString());
+ final String type = getManagementClass(entity.getClass()).getName();
+ responseHeader.setHeader(TYPE_ATTRIBUTE, type);
+
+ Object messageBody = requestMessage.getMessageBody();
+ if(messageBody instanceof Map)
+ {
+ try
+ {
+ entity.setAttributes((Map)messageBody);
+ return performReadOperation(requestMessage, entity);
+ }
+ catch(RuntimeException e)
+ {
+ if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException)
+ {
+ return createFailureResponse(requestMessage, STATUS_CODE_FORBIDDEN, e.getMessage());
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ else
+ {
+ return createFailureResponse(requestMessage,
+ STATUS_CODE_BAD_REQUEST,
+ "The message body in the request was not of the correct type");
+ }
+
+
+ }
+
+ private ConfiguredObject findSubject(final String name, final String id, final String type)
+ {
+ ConfiguredObject subject;
+ ManagedEntityType met = _entityTypes.get(type);
+ if(met == null)
+ {
+ return null;
+ }
+
+ subject = findSubject(name, id, met);
+ if(subject == null)
+ {
+ ArrayList<ManagedEntityType> allTypes = new ArrayList<ManagedEntityType>(_entityTypes.values());
+ for(ManagedEntityType entityType : allTypes)
+ {
+ if(Arrays.asList(entityType.getParents()).contains(met))
+ {
+ subject = findSubject(name, id, entityType);
+ if(subject != null)
+ {
+ return subject;
+ }
+ }
+ }
+ }
+ return subject;
+ }
+
+ private ConfiguredObject findSubject(final String name, final String id, final ManagedEntityType entityType)
+ {
+
+ Map<String, ConfiguredObject> objects = _entities.get(entityType);
+ if(name != null)
+ {
+ ConfiguredObject subject = objects.get(name);
+ if(subject != null)
+ {
+ return subject;
+ }
+ }
+ else
+ {
+ final Collection<ConfiguredObject> values = new ArrayList<ConfiguredObject>(objects.values());
+ for(ConfiguredObject o : values)
+ {
+ if(o.getId().toString().equals(id))
+ {
+ return o;
+ }
+ }
+ }
+ return null;
+ }
+
+ private InternalMessage createFailureResponse(final InternalMessage requestMessage,
+ final int statusCode,
+ final String stateDescription,
+ String... params)
+ {
+ final InternalMessageHeader requestHeader = requestMessage.getMessageHeader();
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+ for(String header : requestHeader.getHeaderNames())
+ {
+ responseHeader.setHeader(header, requestHeader.getHeader(header));
+ }
+ responseHeader.setHeader(STATUS_CODE_HEADER, statusCode);
+ responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, MessageFormat.format(stateDescription, params));
+ return InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), responseHeader, new byte[0]);
+
+ }
+
+ private InternalMessage performManagementOperation(final InternalMessage msg)
+ {
+ final InternalMessage responseMessage;
+ final InternalMessageHeader requestHeader = msg.getMessageHeader();
+ final MutableMessageHeader responseHeader = new MutableMessageHeader();
+ responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null
+ ? requestHeader.getMessageId()
+ : requestHeader.getCorrelationId());
+ responseHeader.setMessageId(UUID.randomUUID().toString());
+
+
+ String operation = (String) requestHeader.getHeader(OPERATION_HEADER);
+ if(GET_TYPES.equals(operation))
+ {
+ responseMessage = performGetTypes(requestHeader, responseHeader);
+ }
+ else if(GET_ATTRIBUTES.equals(operation))
+ {
+ responseMessage = performGetAttributes(requestHeader, responseHeader);
+ }
+ else if(GET_OPERATIONS.equals(operation))
+ {
+ responseMessage = performGetOperations(requestHeader, responseHeader);
+ }
+ else if(QUERY.equals(operation))
+ {
+ responseMessage = performQuery(requestHeader, responseHeader);
+ }
+ else
+ {
+ responseMessage = InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), requestHeader, new byte[0]);
+ }
+ return responseMessage;
+ }
+
+ private InternalMessage performGetTypes(final InternalMessageHeader requestHeader,
+ final MutableMessageHeader responseHeader)
+ {
+ final InternalMessage responseMessage;
+ List<String> restriction;
+ if(requestHeader.containsHeader(ENTITY_TYPES_HEADER))
+ {
+ restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER);
+ }
+ else
+ {
+ restriction = null;
+ }
+
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+ Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+ Map<String,ManagedEntityType> entityMapCopy;
+ synchronized (_entityTypes)
+ {
+ entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+ }
+
+ for(ManagedEntityType type : entityMapCopy.values())
+ {
+ if(restriction == null || meetsIndirectRestriction(type,restriction))
+ {
+ final ManagedEntityType[] parents = type.getParents();
+ List<String> parentNames = new ArrayList<String>();
+ if(parents != null)
+ {
+ for(ManagedEntityType parent : parents)
+ {
+ parentNames.add(parent.getName());
+ }
+ }
+ responseMap.put(type.getName(), parentNames);
+ }
+ }
+ responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+ return responseMessage;
+ }
+
+ private InternalMessage performGetAttributes(final InternalMessageHeader requestHeader,
+ final MutableMessageHeader responseHeader)
+ {
+ final InternalMessage responseMessage;
+ List<String> restriction;
+ if(requestHeader.containsHeader(ENTITY_TYPES_HEADER))
+ {
+ restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER);
+ }
+ else
+ {
+ restriction = null;
+ }
+
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+ Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+ Map<String,ManagedEntityType> entityMapCopy;
+ synchronized (_entityTypes)
+ {
+ entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+ }
+
+ for(ManagedEntityType type : entityMapCopy.values())
+ {
+ if(restriction == null || restriction.contains(type.getName()))
+ {
+ responseMap.put(type.getName(), Arrays.asList(type.getAttributes()));
+ }
+ }
+ responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+ return responseMessage;
+ }
+
+
+ private InternalMessage performGetOperations(final InternalMessageHeader requestHeader,
+ final MutableMessageHeader responseHeader)
+ {
+ final InternalMessage responseMessage;
+ List<String> restriction;
+ if(requestHeader.containsHeader(ENTITY_TYPES_HEADER))
+ {
+ restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER);
+ }
+ else
+ {
+ restriction = null;
+ }
+
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+ Map<String,Object> responseMap = new LinkedHashMap<String, Object>();
+ Map<String,ManagedEntityType> entityMapCopy;
+ synchronized (_entityTypes)
+ {
+ entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes);
+ }
+
+ for(ManagedEntityType type : entityMapCopy.values())
+ {
+ if(restriction == null || restriction.contains(type.getName()))
+ {
+ responseMap.put(type.getName(), Arrays.asList(type.getOperations()));
+ }
+ }
+ responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap);
+ return responseMessage;
+ }
+
+ private InternalMessage performQuery(final InternalMessageHeader requestHeader,
+ final MutableMessageHeader responseHeader)
+ {
+ final InternalMessage responseMessage;
+ List<String> restriction;
+ List<String> attributes;
+ int offset;
+ int count;
+
+ if(requestHeader.containsHeader(ENTITY_TYPES_HEADER))
+ {
+ restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER);
+ responseHeader.setHeader(ENTITY_TYPES_HEADER, restriction);
+ }
+ else
+ {
+ restriction = new ArrayList<String>(_entityTypes.keySet());
+ }
+
+
+ if(requestHeader.containsHeader(ATTRIBUTES_HEADER))
+ {
+ attributes = (List<String>) requestHeader.getHeader(ATTRIBUTES_HEADER);
+ }
+ else
+ {
+ LinkedHashMap<String,Void> attributeSet = new LinkedHashMap<String, Void>();
+ for(String entityType : restriction)
+ {
+ ManagedEntityType type = _entityTypes.get(entityType);
+ if(type != null)
+ {
+ for(String attributeName : type.getAttributes())
+ {
+ attributeSet.put(attributeName, null);
+ }
+ }
+ }
+ attributes = new ArrayList<String>(attributeSet.keySet());
+
+ }
+
+ if(requestHeader.containsHeader(OFFSET_HEADER))
+ {
+ offset = ((Number) requestHeader.getHeader(OFFSET_HEADER)).intValue();
+ responseHeader.setHeader(OFFSET_HEADER,offset);
+ }
+ else
+ {
+ offset = 0;
+ }
+
+ if(requestHeader.containsHeader(COUNT_HEADER))
+ {
+ count = ((Number) requestHeader.getHeader(COUNT_HEADER)).intValue();
+ }
+ else
+ {
+ count = Integer.MAX_VALUE;
+ }
+
+ responseHeader.setHeader(ATTRIBUTES_HEADER, attributes);
+
+ responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK);
+ List<List<Object>> responseList = new ArrayList<List<Object>>();
+
+ int rowNo = 0;
+ for(String type : restriction)
+ {
+ ManagedEntityType entityType = _entityTypes.get(type);
+ if(entityType != null)
+ {
+ Map<String, ConfiguredObject> entityMap = _entities.get(entityType);
+ if(entityMap != null)
+ {
+ List<ConfiguredObject> entities;
+ synchronized(entityMap)
+ {
+ entities = new ArrayList<ConfiguredObject>(entityMap.values());
+ }
+ for(ConfiguredObject entity : entities)
+ {
+ if(rowNo++ >= offset)
+ {
+ Object[] attrValue = new Object[attributes.size()];
+ int col = 0;
+ for(String attr : attributes)
+ {
+ Object value;
+ if(TYPE_ATTRIBUTE.equals(attr))
+ {
+ value = entityType.getName();
+ }
+ else
+ {
+ value = fixValue(entity.getAttribute(attr));
+ }
+ attrValue[col++] = value;
+ }
+ responseList.add(Arrays.asList(attrValue));
+ }
+ if(responseList.size()==count)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ if(responseList.size()==count)
+ {
+ break;
+ }
+ }
+ responseHeader.setHeader(COUNT_HEADER, count);
+ responseMessage = InternalMessage.createListMessage(_virtualHost.getMessageStore(),
+ responseHeader,
+ responseList);
+ return responseMessage;
+ }
+
+ private Object fixValue(final Object value)
+ {
+ Object fixedValue;
+ if(value instanceof Enum)
+ {
+ fixedValue = value.toString();
+ }
+ else if(value instanceof Map)
+ {
+ Map<Object, Object> oldValue = (Map<Object, Object>) value;
+ Map<Object, Object> newValue = new LinkedHashMap<Object, Object>();
+ for(Map.Entry<Object, Object> entry : oldValue.entrySet())
+ {
+ newValue.put(fixValue(entry.getKey()),fixValue(entry.getValue()));
+ }
+ fixedValue = newValue;
+ }
+ else if(value instanceof Collection)
+ {
+ Collection oldValue = (Collection) value;
+ List newValue = new ArrayList(oldValue.size());
+ for(Object o : oldValue)
+ {
+ newValue.add(fixValue(o));
+ }
+ fixedValue = newValue;
+ }
+ else if(value != null && value.getClass().isArray() && !(value instanceof byte[]))
+ {
+ fixedValue = fixValue(Arrays.asList((Object[])value));
+ }
+ else
+ {
+ fixedValue = value;
+ }
+ return fixedValue;
+
+ }
+
+
+ private boolean meetsIndirectRestriction(final ManagedEntityType type, final List<String> restriction)
+ {
+ if(restriction.contains(type.getName()))
+ {
+ return true;
+ }
+ if(type.getParents() != null)
+ {
+ for(ManagedEntityType parent : type.getParents())
+ {
+ if(meetsIndirectRestriction(parent, restriction))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized <T extends ConsumerTarget> ManagementNodeConsumer addConsumer(final T target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<Consumer.Option> options) throws AMQException
+ {
+
+ final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
+ target.consumerAdded(managementNodeConsumer);
+ _consumers.put(consumerName, managementNodeConsumer);
+ for(ConsumerRegistrationListener<ManagementNode> listener : _consumerRegistrationListeners)
+ {
+ listener.consumerAdded(this, managementNodeConsumer);
+ }
+ return managementNodeConsumer;
+ }
+
+ @Override
+ public synchronized Collection<ManagementNodeConsumer> getConsumers()
+ {
+ return new ArrayList<ManagementNodeConsumer>(_consumers.values());
+ }
+
+ @Override
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener<ManagementNode> listener)
+ {
+ _consumerRegistrationListeners.add(listener);
+ }
+
+ @Override
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+ {
+ _consumerRegistrationListeners.remove(listener);
+ }
+
+ @Override
+ public AuthorizationHolder getAuthorizationHolder()
+ {
+ return null;
+ }
+
+ @Override
+ public void setAuthorizationHolder(final AuthorizationHolder principalHolder)
+ {
+
+ }
+
+ @Override
+ public void setExclusiveOwningSession(final AMQSessionModel owner)
+ {
+
+ }
+
+ @Override
+ public AMQSessionModel getExclusiveOwningSession()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isExclusive()
+ {
+ return false;
+ }
+
+ @Override
+ public String getName()
+ {
+ return MANAGEMENT_NODE_NAME;
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ private class ConsumedMessageInstance implements MessageInstance<ConsumedMessageInstance,Consumer>
+ {
+ private final ServerMessage _message;
+ private final InstanceProperties _properties;
+
+ public ConsumedMessageInstance(final ServerMessage message,
+ final InstanceProperties properties)
+ {
+ _message = message;
+ _properties = properties;
+ }
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+
+ }
+
+ @Override
+ public void addStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener)
+ {
+
+ }
+
+ @Override
+ public boolean removeStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener)
+ {
+ return false;
+ }
+
+
+ @Override
+ public boolean acquiredByConsumer()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isAcquiredBy(final Consumer consumer)
+ {
+ return false;
+ }
+
+ @Override
+ public void setRedelivered()
+ {
+
+ }
+
+ @Override
+ public boolean isRedelivered()
+ {
+ return false;
+ }
+
+ @Override
+ public Consumer getDeliveredConsumer()
+ {
+ return null;
+ }
+
+ @Override
+ public void reject()
+ {
+
+ }
+
+ @Override
+ public boolean isRejectedBy(final Consumer consumer)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean getDeliveredToConsumer()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean expired() throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire(final Consumer sub)
+ {
+ return false;
+ }
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action,
+ final ServerTransaction txn)
+ {
+ return 0;
+ }
+
+
+ @Override
+ public Filterable asFilterable()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isAvailable()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isAcquired()
+ {
+ return false;
+ }
+
+ @Override
+ public void release()
+ {
+
+ }
+
+ @Override
+ public boolean resend() throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public void delete()
+ {
+
+ }
+
+ @Override
+ public boolean isDeleted()
+ {
+ return false;
+ }
+
+ @Override
+ public ServerMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public InstanceProperties getInstanceProperties()
+ {
+ return _properties;
+ }
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return ManagementNode.this;
+ }
+ }
+
+ private class ModelObjectListener implements ConfigurationChangeListener
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+ {
+ if(newState == State.DELETED)
+ {
+ _registry.removeSystemNode(ManagementNode.this);
+ }
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+ final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName());
+ if(entityType != null)
+ {
+ _entities.get(entityType).put(child.getName(), child);
+ }
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+ final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName());
+ if(entityType != null)
+ {
+ _entities.get(entityType).remove(child.getName());
+ }
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+
+ }
+ }
+
+ private static class MutableMessageHeader implements AMQMessageHeader
+ {
+ private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<String, Object>();
+ private String _correlationId;
+ private long _expiration;
+ private String _userId;
+ private String _appId;
+ private String _messageId;
+ private String _mimeType;
+ private String _encoding;
+ private byte _priority;
+ private long _timestamp;
+ private String _type;
+ private String _replyTo;
+
+ public void setCorrelationId(final String correlationId)
+ {
+ _correlationId = correlationId;
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
+ }
+
+ public void setUserId(final String userId)
+ {
+ _userId = userId;
+ }
+
+ public void setAppId(final String appId)
+ {
+ _appId = appId;
+ }
+
+ public void setMessageId(final String messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public void setMimeType(final String mimeType)
+ {
+ _mimeType = mimeType;
+ }
+
+ public void setEncoding(final String encoding)
+ {
+ _encoding = encoding;
+ }
+
+ public void setPriority(final byte priority)
+ {
+ _priority = priority;
+ }
+
+ public void setTimestamp(final long timestamp)
+ {
+ _timestamp = timestamp;
+ }
+
+ public void setType(final String type)
+ {
+ _type = type;
+ }
+
+ public void setReplyTo(final String replyTo)
+ {
+ _replyTo = replyTo;
+ }
+
+ public String getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public String getUserId()
+ {
+ return _userId;
+ }
+
+ public String getAppId()
+ {
+ return _appId;
+ }
+
+ public String getMessageId()
+ {
+ return _messageId;
+ }
+
+ public String getMimeType()
+ {
+ return _mimeType;
+ }
+
+ public String getEncoding()
+ {
+ return _encoding;
+ }
+
+ public byte getPriority()
+ {
+ return _priority;
+ }
+
+ public long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public String getType()
+ {
+ return _type;
+ }
+
+ public String getReplyTo()
+ {
+ return _replyTo;
+ }
+
+ @Override
+ public Object getHeader(final String name)
+ {
+ return _headers.get(name);
+ }
+
+ @Override
+ public boolean containsHeaders(final Set<String> names)
+ {
+ return _headers.keySet().containsAll(names);
+ }
+
+ @Override
+ public boolean containsHeader(final String name)
+ {
+ return _headers.containsKey(name);
+ }
+
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return Collections.unmodifiableCollection(_headers.keySet());
+ }
+
+ public void setHeader(String header, Object value)
+ {
+ _headers.put(header,value);
+ }
+
+ }
+}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
new file mode 100644
index 0000000000..98bb20d364
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+class ManagementNodeConsumer implements Consumer
+{
+ private final long _id = Consumer.SUB_ID_GENERATOR.getAndIncrement();
+ private final ManagementNode _managementNode;
+ private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
+ private final ConsumerTarget _target;
+ private final Lock _stateChangeLock = new ReentrantLock();
+ private final String _name;
+ private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener();
+
+
+ public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
+ {
+ _name = consumerName;
+ _managementNode = managementNode;
+ _target = target;
+ target.setStateListener(_targetChangeListener);
+ }
+
+ @Override
+ public void externalStateChange()
+ {
+
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ return _target.getSessionModel();
+ }
+
+ @Override
+ public void setNoLocal(final boolean noLocal)
+ {
+ }
+
+ @Override
+ public long getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public boolean isSuspended()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isClosed()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquires()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean seesRequeues()
+ {
+ return false;
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+
+ }
+
+ @Override
+ public boolean trySendLock()
+ {
+ return _stateChangeLock.tryLock();
+ }
+
+ @Override
+ public void getSendLock()
+ {
+ _stateChangeLock.lock();
+ }
+
+ @Override
+ public void releaseSendLock()
+ {
+ _stateChangeLock.unlock();
+ }
+
+
+ @Override
+ public boolean isActive()
+ {
+ return false;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+
+ @Override
+ public void flush() throws AMQException
+ {
+
+ }
+
+ ManagementNode getManagementNode()
+ {
+ return _managementNode;
+ }
+
+ void send(final InternalMessage response)
+ {
+ getSendLock();
+ try
+ {
+ final ManagementResponse responseEntry = new ManagementResponse(this, response);
+ if(_queue.isEmpty() && _target.allocateCredit(response))
+ {
+ _target.send(responseEntry,false);
+ }
+ else
+ {
+ _queue.add(responseEntry);
+ }
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ releaseSendLock();
+ }
+ }
+
+ private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State>
+ {
+ @Override
+ public void stateChanged(final ConsumerTarget object,
+ final ConsumerTarget.State oldState,
+ final ConsumerTarget.State newState)
+ {
+ if(newState == ConsumerTarget.State.ACTIVE)
+ {
+ deliverMessages();
+ }
+ }
+ }
+
+ private void deliverMessages()
+ {
+ getSendLock();
+ try
+ {
+ while(!_queue.isEmpty())
+ {
+
+ final ManagementResponse managementResponse = _queue.get(0);
+ if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
+ {
+ _queue.remove(0);
+ _target.send(managementResponse,false);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ releaseSendLock();
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java
new file mode 100644
index 0000000000..cdb71f4859
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.server.plugin.SystemNodeCreator;
+
+public class ManagementNodeCreator implements SystemNodeCreator
+{
+ @Override
+ public void register(final SystemNodeRegistry registry)
+ {
+ ManagementNode managementNode = new ManagementNode(registry,registry.getVirtualHostModel());
+ registry.registerSystemNode(managementNode);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "AMQP-VIRTUALHOST-MANAGEMENT";
+ }
+}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
new file mode 100644
index 0000000000..e25f327420
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.management.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
+
+class ManagementResponse implements MessageInstance<ManagementResponse,ManagementNodeConsumer>
+{
+ private final ManagementNodeConsumer _consumer;
+ private int _deliveryCount;
+ private boolean _isRedelivered;
+ private boolean _isDelivered;
+ private boolean _isDeleted;
+ private InternalMessage _message;
+
+ ManagementResponse(final ManagementNodeConsumer consumer, final InternalMessage message)
+ {
+ _consumer = consumer;
+ _message = message;
+ }
+
+ @Override
+ public int getDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public void incrementDeliveryCount()
+ {
+ _deliveryCount++;
+ }
+
+ @Override
+ public void decrementDeliveryCount()
+ {
+ _deliveryCount--;
+ }
+
+ @Override
+ public void addStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener)
+ {
+
+ }
+
+ @Override
+ public boolean removeStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener)
+ {
+ return false;
+ }
+
+
+ @Override
+ public boolean acquiredByConsumer()
+ {
+ return !isDeleted();
+ }
+
+ @Override
+ public boolean isAcquiredBy(final ManagementNodeConsumer consumer)
+ {
+ return consumer == _consumer && !isDeleted();
+ }
+
+ @Override
+ public void setRedelivered()
+ {
+ _isRedelivered = true;
+ }
+
+ @Override
+ public boolean isRedelivered()
+ {
+ return _isRedelivered;
+ }
+
+ @Override
+ public ManagementNodeConsumer getDeliveredConsumer()
+ {
+ return isDeleted() ? null : _consumer;
+ }
+
+ @Override
+ public void reject()
+ {
+ delete();
+ }
+
+ @Override
+ public boolean isRejectedBy(final ManagementNodeConsumer consumer)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean getDeliveredToConsumer()
+ {
+ return _isDelivered;
+ }
+
+ @Override
+ public boolean expired() throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire(final ManagementNodeConsumer sub)
+ {
+ return false;
+ }
+
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action,
+ final ServerTransaction txn)
+ {
+ return 0;
+ }
+
+
+ @Override
+ public Filterable asFilterable()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isAvailable()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean acquire()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isAcquired()
+ {
+ return !isDeleted();
+ }
+
+ @Override
+ public void release()
+ {
+ delete();
+ }
+
+ @Override
+ public boolean resend() throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public void delete()
+ {
+ // TODO
+ }
+
+ @Override
+ public boolean isDeleted()
+ {
+ return _isDeleted;
+ }
+
+ @Override
+ public ServerMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public InstanceProperties getInstanceProperties()
+ {
+ return InstanceProperties.EMPTY;
+ }
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return _consumer.getManagementNode();
+ }
+}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator b/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator
new file mode 100644
index 0000000000..767f93782b
--- /dev/null
+++ b/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.management.amqp.ManagementNodeCreator
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
index 4d85d52997..0329379713 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java
@@ -19,9 +19,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
index 9a24e23407..410a5d23ca 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -35,16 +35,16 @@
<tr>
<td valign="top"><strong>Queue Type: </strong></td>
<td>
- <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
+ <input type="radio" id="formAddQueueTypeStandard" name="queueType" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
<label for="formAddQueueTypeStandard">Standard</label>
&nbsp;&nbsp;
- <input type="radio" id="formAddQueueTypePriority" name="type" value="priority" dojoType="dijit.form.RadioButton" />
+ <input type="radio" id="formAddQueueTypePriority" name="queueType" value="priority" dojoType="dijit.form.RadioButton" />
<label for="formAddQueueTypePriority">Priority</label>
&nbsp;&nbsp;
- <input type="radio" id="formAddQueueTypeLVQ" name="type" value="lvq" dojoType="dijit.form.RadioButton" />
+ <input type="radio" id="formAddQueueTypeLVQ" name="queueType" value="lvq" dojoType="dijit.form.RadioButton" />
<label for="formAddQueueTypeLVQ">LVQ</label>
&nbsp;&nbsp;
- <input type="radio" id="formAddQueueTypeSorted" name="type" value="sorted" dojoType="dijit.form.RadioButton" />
+ <input type="radio" id="formAddQueueTypeSorted" name="queueType" value="sorted" dojoType="dijit.form.RadioButton" />
<label for="formAddQueueTypeSorted">Sorted</label>
</td>
</tr>
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 54cfe225c8..606f95e349 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -277,7 +277,7 @@ define(["dojo/_base/xhr",
"exclusive",
"owner",
"lifetimePolicy",
- "type",
+ "queueType",
"typeQualifier",
"alertRepeatGap",
"alertRepeatGapUnits",
@@ -359,14 +359,14 @@ define(["dojo/_base/xhr",
bytesDepth = formatter.formatBytes( this.queueData["unacknowledgedBytes"] );
this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value;
this.unacknowledgedBytesUnits.innerHTML = bytesDepth.units + ")";
- this.type.innerHTML = entities.encode(this.queueData[ "type" ]);
- if (this.queueData.type == "standard")
+ this.queueType.innerHTML = entities.encode(this.queueData[ "queueType" ]);
+ if (this.queueData.queueType == "standard")
{
this.typeQualifier.style.display = "none";
}
else
{
- this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.type] + ": " + this.queueData[queueTypeKeys[this.queueData.type]] + ")");
+ this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.queueType] + ": " + this.queueData[queueTypeKeys[this.queueData.queueType]] + ")");
}
if(this.queueData["messageGroupKey"])
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
index e8b8dd1721..64e821b2dd 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
@@ -96,7 +96,7 @@ define(["dojo/_base/xhr",
}
}
else if (!typeSpecificFields.hasOwnProperty(propName) ||
- formValues.type === typeSpecificFields[ propName ]) {
+ formValues.queueType === typeSpecificFields[ propName ]) {
if(formValues[ propName ] !== "") {
if (fieldConverters.hasOwnProperty(propName))
{
@@ -130,7 +130,7 @@ define(["dojo/_base/xhr",
theForm = registry.byId("formAddQueue");
array.forEach(theForm.getDescendants(), function(widget)
{
- if(widget.name === "type") {
+ if(widget.name === "queueType") {
widget.on("change", function(isChecked) {
var objId = widget.id + ":fields";
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 89b7327957..252deb3100 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -47,7 +47,7 @@
<div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 150px;">Type:</div>
<div style="float:left;">
- <span class="type"></span>
+ <span class="queueType"></span>
<span class="typeQualifier"></span>
</div>
</div>
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 49cb6aa8cb..d74aa41244 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -183,7 +183,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
@Override
public String getQueueType()
{
- return (String) _queue.getAttribute(Queue.TYPE);
+ return (String) _queue.getAttribute(Queue.QUEUE_TYPE);
}
public boolean isDurable()
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index 3711a90f3b..f94c206512 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -127,7 +127,7 @@ public class QueueMBeanTest extends QpidTestCase
public void testQueueType() throws Exception
{
- assertAttribute("queueType", QUEUE_TYPE, Queue.TYPE);
+ assertAttribute("queueType", QUEUE_TYPE, Queue.QUEUE_TYPE);
}
public void testMaximumDeliveryCount() throws Exception