diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-11 10:19:17 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-11 10:19:17 +0000 |
| commit | 6183b2736fae22b8bafb509e37386fa7a037c5f3 (patch) | |
| tree | 00b68f41ebc7fa1ce306c177fc779a8819299e0a /qpid/java/broker-plugins | |
| parent | acf22a677693d9b0dfcdfdd8e8340af8cacbcb3c (diff) | |
| download | qpid-python-6183b2736fae22b8bafb509e37386fa7a037c5f3.tar.gz | |
QPID-5504 : refactring of queues, and introduce management node and amqp-management module
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1567026 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
32 files changed, 3580 insertions, 118 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java new file mode 100644 index 0000000000..37bbd810b4 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +public class MessageConverter_Internal_to_v0_10 implements MessageConverter<InternalMessage, MessageTransferMessage> +{ + @Override + public Class<InternalMessage> getInputClass() + { + return InternalMessage.class; + } + + @Override + public Class<MessageTransferMessage> getOutputClass() + { + return MessageTransferMessage.class; + } + + @Override + public MessageTransferMessage convert(InternalMessage serverMsg, VirtualHost vhost) + { + return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); + } + + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final InternalMessage serverMsg) + { + final byte[] messageContent = MessageConverter_v0_10.convertToBody(serverMsg.getMessageBody()); + final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg, + MessageConverter_v0_10.getBodyMimeType( + serverMsg.getMessageBody()), + messageContent.length); + + return new StoredMessage<MessageMetaData_0_10>() + { + @Override + public MessageMetaData_0_10 getMetaData() + { + return messageMetaData_0_10; + } + + @Override + public long getMessageNumber() + { + return serverMsg.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + int size = messageContent.length - offsetInMessage; + if(dst.remaining() < size) + { + size = dst.remaining(); + } + ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size); + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return ByteBuffer.wrap(messageContent, offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + { + DeliveryProperties deliveryProps = new DeliveryProperties(); + MessageProperties messageProps = new MessageProperties(); + + + + deliveryProps.setExpiration(serverMsg.getExpiration()); + deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); + deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + + messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); + messageProps.setContentLength(size); + messageProps.setContentType(bodyMimeType); + if(serverMsg.getMessageHeader().getCorrelationId() != null) + { + messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); + } + + Header header = new Header(deliveryProps, messageProps, null); + return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); + } + + + @Override + public String getType() + { + return "Internal to v0-10"; + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 5244a7f51b..32ecc6bd0e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -20,7 +20,14 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; @@ -30,9 +37,13 @@ import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.codec.BBEncoder; public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage> { + + public static final Charset UTF_8 = Charset.forName("UTF-8"); + @Override public Class<ServerMessage> getInputClass() { @@ -129,6 +140,79 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); } + + public static byte[] convertToBody(Object object) + { + if(object instanceof String) + { + return ((String)object).getBytes(UTF_8); + } + else if(object instanceof byte[]) + { + return (byte[]) object; + } + else if(object instanceof Map) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap((Map)object); + ByteBuffer buf = encoder.segment(); + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + + } + else if(object instanceof List) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList((List) object); + ByteBuffer buf = encoder.segment(); + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + } + else + { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try + { + ObjectOutputStream os = new ObjectOutputStream(bytesOut); + os.writeObject(object); + return bytesOut.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + public static String getBodyMimeType(Object object) + { + if(object instanceof String) + { + return "text/plain"; + } + else if(object instanceof byte[]) + { + return "application/octet-stream"; + } + else if(object instanceof Map) + { + return "amqp/map"; + } + else if(object instanceof List) + { + return "amqp/list"; + } + else + { + return "application/java-object-stream"; + } + } + + @Override public String getType() { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java new file mode 100644 index 0000000000..bc5f8899f2 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -0,0 +1,271 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageMetaData; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +public class MessageConverter_v0_10_to_Internal implements MessageConverter<MessageTransferMessage, InternalMessage> +{ + @Override + public Class<MessageTransferMessage> getInputClass() + { + return MessageTransferMessage.class; + } + + @Override + public Class<InternalMessage> getOutputClass() + { + return InternalMessage.class; + } + + @Override + public InternalMessage convert(MessageTransferMessage serverMessage, VirtualHost vhost) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Object body = convertMessageBody(mimeType, data); + MessageProperties messageProps = serverMessage.getHeader().getMessageProperties(); + AMQMessageHeader fixedHeader = new DelegatingMessageHeader(serverMessage.getMessageHeader(), messageProps == null ? null : messageProps.getReplyTo()); + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), fixedHeader, body); + } + + private static class DelegatingMessageHeader implements AMQMessageHeader + { + private final AMQMessageHeader _delegate; + private final ReplyTo _replyTo; + + + private DelegatingMessageHeader(final AMQMessageHeader delegate, final ReplyTo replyTo) + { + _delegate = delegate; + _replyTo = replyTo; + } + + @Override + public String getCorrelationId() + { + return _delegate.getCorrelationId(); + } + + @Override + public long getExpiration() + { + return _delegate.getExpiration(); + } + + @Override + public String getUserId() + { + return _delegate.getUserId(); + } + + @Override + public String getAppId() + { + return _delegate.getAppId(); + } + + @Override + public String getMessageId() + { + return _delegate.getMessageId(); + } + + @Override + public String getMimeType() + { + return _delegate.getMimeType(); + } + + @Override + public String getEncoding() + { + return _delegate.getEncoding(); + } + + @Override + public byte getPriority() + { + return _delegate.getPriority(); + } + + @Override + public long getTimestamp() + { + return _delegate.getTimestamp(); + } + + @Override + public String getType() + { + return _delegate.getType(); + } + + @Override + public String getReplyTo() + { + return _replyTo == null + ? null + : _replyTo.getExchange() == null || _replyTo.getExchange().equals("") + ? _replyTo.getRoutingKey() + : _replyTo.getRoutingKey() == null || _replyTo.getRoutingKey().equals("") + ? _replyTo.getExchange() + : _replyTo.getExchange() + "/" + _replyTo.getRoutingKey(); + } + + @Override + public Object getHeader(final String name) + { + return _delegate.getHeader(name); + } + + @Override + public boolean containsHeaders(final Set<String> names) + { + return _delegate.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return _delegate.containsHeader(name); + } + + @Override + public Collection<String> getHeaderNames() + { + return _delegate.getHeaderNames(); + } + } + + private static Object convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return text; + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); + } + + } + + return map; + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + final Map<String,Object> map = decoder.readMap(); + + return map; + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return decoder.readList(); + } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return list; + } + else + { + return data; + + } + } + + @Override + public String getType() + { + return "v0-10 to Internal"; + } +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 53022c333e..87a02b99c1 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -928,10 +928,10 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } - private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>> + private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<?,C>> { @Override - public void performAction(final MessageInstance<C> entry) + public void performAction(final MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter index 995b0fabdc..dd115905a4 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -17,3 +17,5 @@ # under the License. # org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10 +org.apache.qpid.server.protocol.v0_10.MessageConverter_Internal_to_v0_10 +org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10_to_Internal diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7e712c8e17..8becdf853b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1202,14 +1202,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>> + private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>> { public ImmediateAction() { } - public void performAction(MessageInstance<C> entry) + public void performAction(MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); @@ -1274,10 +1274,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>> + private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>> { @Override - public void performAction(final MessageInstance<C> entry) + public void performAction(final MessageInstance<?,C> entry) { TransactionLogResource queue = entry.getOwningResource(); if(queue instanceof CapacityChecker) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java new file mode 100644 index 0000000000..b80ad3e7b8 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -0,0 +1,268 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.codec.BBEncoder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class MessageConverter_Internal_to_v0_8 implements MessageConverter<InternalMessage, AMQMessage> +{ + private static final int BASIC_CLASS_ID = 60; + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + + public Class<InternalMessage> getInputClass() + { + return InternalMessage.class; + } + + @Override + public Class<AMQMessage> getOutputClass() + { + return AMQMessage.class; + } + + @Override + public AMQMessage convert(InternalMessage serverMsg, VirtualHost vhost) + { + return new AMQMessage(convertToStoredMessage(serverMsg), null); + } + + private StoredMessage<MessageMetaData> convertToStoredMessage(final InternalMessage serverMsg) + { + final byte[] messageContent = convertToBody(serverMsg.getMessageBody()); + final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg, + getBodyMimeType(serverMsg.getMessageBody()), + messageContent.length); + + return new StoredMessage<MessageMetaData>() + { + @Override + public MessageMetaData getMetaData() + { + return messageMetaData_0_8; + } + + @Override + public long getMessageNumber() + { + return serverMsg.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + int size = messageContent.length - offsetInMessage; + if(dst.remaining() < size) + { + size = dst.remaining(); + } + ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size); + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return ByteBuffer.wrap(messageContent, offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData convertMetaData(InternalMessage serverMsg, final String bodyMimeType, final int size) + { + + MessagePublishInfo publishInfo = new MessagePublishInfo() + { + @Override + public AMQShortString getExchange() + { + return null; + } + + @Override + public void setExchange(final AMQShortString amqShortString) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isImmediate() + { + return false; + } + + @Override + public boolean isMandatory() + { + return false; + } + + @Override + public AMQShortString getRoutingKey() + { + return null; + } + }; + + + final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setAppId(serverMsg.getMessageHeader().getAppId()); + props.setContentType(bodyMimeType); + props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId()); + props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT); + props.setExpiration(serverMsg.getExpiration()); + props.setMessageId(serverMsg.getMessageHeader().getMessageId()); + props.setPriority(serverMsg.getMessageHeader().getPriority()); + props.setReplyTo(serverMsg.getMessageHeader().getReplyTo()); + props.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + props.setUserId(serverMsg.getMessageHeader().getUserId()); + + Map<String,Object> headerProps = new LinkedHashMap<String, Object>(); + + for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) + { + headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName)); + } + + props.setHeaders(FieldTable.convertToFieldTable(headerProps)); + + final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); + } + + + @Override + public String getType() + { + return "Internal to v0-8"; + } + + + public static byte[] convertToBody(Object object) + { + if(object instanceof String) + { + return ((String)object).getBytes(UTF_8); + } + else if(object instanceof byte[]) + { + return (byte[]) object; + } + else if(object instanceof Map) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap((Map)object); + ByteBuffer buf = encoder.segment(); + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + + } + else if(object instanceof List) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList((List) object); + ByteBuffer buf = encoder.segment(); + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + } + else + { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try + { + ObjectOutputStream os = new ObjectOutputStream(bytesOut); + os.writeObject(object); + return bytesOut.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + public static String getBodyMimeType(Object object) + { + if(object instanceof String) + { + return "text/plain"; + } + else if(object instanceof byte[]) + { + return "application/octet-stream"; + } + else if(object instanceof Map) + { + return "amqp/map"; + } + else if(object instanceof List) + { + return "amqp/list"; + } + else + { + return "application/java-object-stream"; + } + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java new file mode 100644 index 0000000000..6076ff66c7 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMessage, InternalMessage> +{ + @Override + public Class<AMQMessage> getInputClass() + { + return AMQMessage.class; + } + + @Override + public Class<InternalMessage> getOutputClass() + { + return InternalMessage.class; + } + + @Override + public InternalMessage convert(AMQMessage serverMessage, VirtualHost vhost) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Object body = convertMessageBody(mimeType, data); + + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), body); + } + + private static Object convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return text; + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); + } + + } + + return map; + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + final Map<String,Object> map = decoder.readMap(); + + return map; + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return decoder.readList(); + } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return list; + } + else + { + return data; + + } + } + + @Override + public String getType() + { + return "v0-8 to Internal"; + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter new file mode 100644 index 0000000000..d87bc2566f --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.v0_8.MessageConverter_Internal_to_v0_8 +org.apache.qpid.server.protocol.v0_8.MessageConverter_v0_8_to_Internal
\ No newline at end of file diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java index dded0c70fe..f47525097e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java @@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.queue.SimpleAMQQueue; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.util.BrokerTestHelper; @@ -36,7 +36,7 @@ import java.util.List; public class AcknowledgeTest extends QpidTestCase { private AMQChannel _channel; - private SimpleAMQQueue _queue; + private AMQQueue _queue; private MessageStore _messageStore; private String _queueName; @@ -79,7 +79,7 @@ public class AcknowledgeTest extends QpidTestCase return (InternalTestProtocolSession)_channel.getProtocolSession(); } - private SimpleAMQQueue getQueue() + private AMQQueue getQueue() { return _queue; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java index f6376e56c4..dc687e1075 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java @@ -26,10 +26,8 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -39,7 +37,7 @@ import java.util.List; public class QueueBrowserUsesNoAckTest extends QpidTestCase { private AMQChannel _channel; - private SimpleAMQQueue _queue; + private AMQQueue _queue; private MessageStore _messageStore; private String _queueName; @@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest extends QpidTestCase return (InternalTestProtocolSession)_channel.getProtocolSession(); } - private SimpleAMQQueue getQueue() + private AMQQueue getQueue() { return _queue; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java new file mode 100644 index 0000000000..f02908391a --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.server.message.internal.InternalMessage; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<InternalMessage> +{ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + + public Class<InternalMessage> getInputClass() + { + return InternalMessage.class; + } + + + @Override + protected MessageMetaData_1_0 convertMetaData(final InternalMessage serverMessage, + final SectionEncoder sectionEncoder) + { + List<Section> sections = new ArrayList<Section>(3); + Header header = new Header(); + + header.setDurable(serverMessage.isPersistent()); + header.setPriority(UnsignedByte.valueOf(serverMessage.getMessageHeader().getPriority())); + if(serverMessage.getExpiration() != 0l && serverMessage.getArrivalTime() !=0l && serverMessage.getExpiration() >= serverMessage.getArrivalTime()) + { + header.setTtl(UnsignedInteger.valueOf(serverMessage.getExpiration()-serverMessage.getArrivalTime())); + } + + sections.add(header); + + Properties properties = new Properties(); + properties.setCorrelationId(serverMessage.getMessageHeader().getCorrelationId()); + properties.setCreationTime(new Date(serverMessage.getMessageHeader().getTimestamp())); + properties.setMessageId(serverMessage.getMessageHeader().getMessageId()); + final String userId = serverMessage.getMessageHeader().getUserId(); + if(userId != null) + { + properties.setUserId(new Binary(userId.getBytes(UTF_8))); + } + properties.setReplyTo(serverMessage.getMessageHeader().getReplyTo()); + + sections.add(properties); + + if(!serverMessage.getMessageHeader().getHeaderNames().isEmpty()) + { + ApplicationProperties applicationProperties = new ApplicationProperties(serverMessage.getMessageHeader().getHeaderMap() ); + sections.add(applicationProperties); + } + return new MessageMetaData_1_0(sections, sectionEncoder); + + } + + protected Section getBodySection(final InternalMessage serverMessage, final String mimeType) + { + return convertToBody(serverMessage.getMessageBody()); + } + + + @Override + public String getType() + { + return "Internal to v1-0"; + } + + + public Section convertToBody(Object object) + { + if(object instanceof String) + { + return new AmqpValue(object); + } + else if(object instanceof byte[]) + { + return new Data(new Binary((byte[])object)); + } + else if(object instanceof Map) + { + return new AmqpValue(MessageConverter_to_1_0.fixMapValues((Map)object)); + } + else if(object instanceof List) + { + return new AmqpValue(MessageConverter_to_1_0.fixListValues((List)object)); + } + else + { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try + { + ObjectOutputStream os = new ObjectOutputStream(bytesOut); + os.writeObject(object); + return new Data(new Binary(bytesOut.toByteArray())); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index a96d951de6..a8a203b247 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -156,7 +156,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } } - private static Map fixMapValues(final Map<String, Object> map) + static Map fixMapValues(final Map<String, Object> map) { for(Map.Entry<String,Object> entry : map.entrySet()) { @@ -165,7 +165,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement return map; } - private static Object fixValue(final Object value) + static Object fixValue(final Object value) { if(value instanceof byte[]) { @@ -185,7 +185,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } } - private static List fixListValues(final List list) + static List fixListValues(final List list) { ListIterator iterator = list.listIterator(); while(iterator.hasNext()) @@ -198,83 +198,88 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, - final ServerMessage serverMessage, + final M serverMessage, SectionEncoder sectionEncoder) { - final String mimeType = serverMessage.getMessageHeader().getMimeType(); - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + Section bodySection = getBodySection(serverMessage, mimeType); - Section bodySection = convertMessageBody(mimeType, data); + final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); - final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); - - return new StoredMessage<MessageMetaData_1_0>() - { - @Override - public MessageMetaData_1_0 getMetaData() - { - return metaData; - } - - @Override - public long getMessageNumber() - { - return serverMessage.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - int size; - if(dst.remaining()<buf.remaining()) - { - buf.limit(dst.remaining()); - size = dst.remaining(); - } - else + return new StoredMessage<MessageMetaData_1_0>() { - size = buf.remaining(); - } - dst.put(buf); - return size; - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - if(size < buf.remaining()) - { - buf.limit(size); - } - return buf; - } + @Override + public MessageMetaData_1_0 getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return serverMessage.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + int size; + if(dst.remaining()<buf.remaining()) + { + buf.limit(dst.remaining()); + size = dst.remaining(); + } + else + { + size = buf.remaining(); + } + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + if(size < buf.remaining()) + { + buf.limit(size); + } + return buf; + } + + @Override + public StoreFuture flushToStore() + { + throw new UnsupportedOperationException(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } - @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } + protected Section getBodySection(final M serverMessage, final String mimeType) + { + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } + return convertMessageBody(mimeType, data); + } private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java new file mode 100644 index 0000000000..f639f98dba --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java @@ -0,0 +1,281 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage> +{ + + static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance(); + static + { + TYPE_REGISTRY.registerTransportLayer(); + TYPE_REGISTRY.registerMessagingLayer(); + TYPE_REGISTRY.registerTransactionLayer(); + TYPE_REGISTRY.registerSecurityLayer(); + } + + @Override + public Class<Message_1_0> getInputClass() + { + return Message_1_0.class; + } + + @Override + public Class<InternalMessage> getOutputClass() + { + return InternalMessage.class; + } + + @Override + public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + + + + + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data)); + + SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY); + + try + { + List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data)); + ListIterator<Section> iterator = sections.listIterator(); + Section previousSection = null; + while(iterator.hasNext()) + { + Section section = iterator.next(); + if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence)) + { + iterator.remove(); + } + else + { + if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue)) + { + throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); + } + else + { + previousSection = section; + } + } + } + + Object bodyObject; + + if(sections.isEmpty()) + { + // should actually be illegal + bodyObject = new byte[0]; + } + else + { + Section firstBodySection = sections.get(0); + if(firstBodySection instanceof AmqpValue) + { + bodyObject = fixObject(((AmqpValue)firstBodySection).getValue()); + } + else if(firstBodySection instanceof Data) + { + int totalSize = 0; + for(Section section : sections) + { + totalSize += ((Data)section).getValue().getLength(); + } + byte[] bodyData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(bodyData); + for(Section section : sections) + { + buf.put(((Data)section).getValue().asByteBuffer()); + } + bodyObject = bodyData; + } + else + { + ArrayList totalSequence = new ArrayList(); + for(Section section : sections) + { + totalSequence.addAll(((AmqpSequence)section).getValue()); + } + bodyObject = fixObject(totalSequence); + } + } + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject); + + } + catch (AmqpErrorException e) + { + throw new RuntimeException(e); + } + + + + + } + + private Object fixObject(final Object value) + { + if(value instanceof Binary) + { + final Binary binaryValue = (Binary) value; + byte[] data = new byte[binaryValue.getLength()]; + binaryValue.asByteBuffer().get(data); + return data; + } + else if(value instanceof List) + { + List listValue = (List) value; + List fixedValue = new ArrayList(listValue.size()); + for(Object o : listValue) + { + fixedValue.add(fixObject(o)); + } + return fixedValue; + } + else if(value instanceof Map) + { + Map<?,?> mapValue = (Map) value; + Map fixedValue = new LinkedHashMap(mapValue.size()); + for(Map.Entry<?,?> entry : mapValue.entrySet()) + { + fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue())); + } + return fixedValue; + } + else + { + return value; + } + + } + + private static Object convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return text; + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); + } + + } + + return map; + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + final Map<String,Object> map = decoder.readMap(); + + return map; + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return decoder.readList(); + } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return list; + } + else + { + return data; + + } + } + + @Override + public String getType() + { + return "v0-8 to Internal"; + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 9e0327fe76..f796a4b2e3 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -43,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; +import org.apache.qpid.amqp_1_0.type.messaging.*; import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.Error; @@ -391,15 +382,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS options.add(Consumer.Option.NO_LOCAL); } - - _consumer.setNoLocal(noLocal); - - try { + final String name; + if(getEndpoint().getTarget() instanceof Target) + { + Target target = (Target) getEndpoint().getTarget(); + name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress(); + } + else + { + name = getEndpoint().getName(); + } _consumer = _queue.addConsumer(_target, messageFilter == null ? null : new SimpleFilterManager(messageFilter), - Message_1_0.class, getEndpoint().getName(), options); + Message_1_0.class, name, options); } catch (AMQException e) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter new file mode 100644 index 0000000000..aa24847805 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.v1_0.MessageConverter_Internal_to_v1_0 +org.apache.qpid.server.protocol.v1_0.MessageConverter_v1_0_to_Internal
\ No newline at end of file diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index f1843de8ac..0c83c31ad4 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra { if(deliveryProps.hasDeliveryMode()) { - props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT + props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT)); } diff --git a/qpid/java/broker-plugins/management-amqp/build.xml b/qpid/java/broker-plugins/management-amqp/build.xml new file mode 100644 index 0000000000..542bb952f1 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/build.xml @@ -0,0 +1,32 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> +<project name="Qpid Broker-Plugins AMQP Management" default="build"> + <property name="module.depends" value="common broker-core" /> + <property name="module.test.depends" value="qpid-test-utils broker-core/tests" /> + + <property name="module.genpom" value="true"/> + <property name="module.genpom.args" value="-Sqpid-common=provided -Sqpid-broker-core=provided"/> + + <property name="broker.plugin" value="true"/> + <property name="broker-plugins-management-amqp.libs" value="" /> + + <import file="../../module.xml" /> + + <target name="bundle" depends="bundle-tasks"/> +</project> diff --git a/qpid/java/broker-plugins/management-amqp/pom.xml b/qpid/java/broker-plugins/management-amqp/pom.xml new file mode 100644 index 0000000000..83873a61f2 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>qpid-parent</artifactId> + <groupId>org.apache.qpid</groupId> + <version>1.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>qpid-broker-plugins-management-amqp</artifactId> + <version>0.28-SNAPSHOT</version> + <name>AMQP Management Protocol Plug-in</name> + <description>AMQP Management broker plug-in</description> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + </dependencies> + + <build> + </build> + +</project> diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java new file mode 100644 index 0000000000..10a16faa56 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagedEntityType.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.amqp; + +import java.util.Arrays; + +class ManagedEntityType +{ + private final String _name; + private final ManagedEntityType[] _parents; + private final String[] _attributes; + private final String[] _operations; + + ManagedEntityType(final String name, + final ManagedEntityType[] parents, + final String[] attributes, + final String[] operations) + { + _name = name; + _parents = parents; + _attributes = attributes; + _operations = operations; + } + + public String getName() + { + return _name; + } + + public ManagedEntityType[] getParents() + { + return _parents; + } + + public String[] getAttributes() + { + return _attributes; + } + + public String[] getOperations() + { + return _operations; + } + + @Override + public String toString() + { + return "ManagedEntityType{" + + "name='" + _name + '\'' + + ", parents=" + Arrays.toString(_parents) + + ", attributes=" + Arrays.toString(_attributes) + + ", operations=" + Arrays.toString(_operations) + + '}'; + } +} diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java new file mode 100644 index 0000000000..1c1c72dd0b --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -0,0 +1,1402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.message.internal.InternalMessageHeader; +import org.apache.qpid.server.model.AmqpManagement; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Model; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.plugin.SystemNodeCreator; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.nio.charset.Charset; +import java.security.AccessControlException; +import java.text.MessageFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementNode>, MessageDestination +{ + + public static final String NAME_ATTRIBUTE = "name"; + public static final String IDENTITY_ATTRIBUTE = "identity"; + public static final String TYPE_ATTRIBUTE = "type"; + public static final String OPERATION_HEADER = "operation"; + public static final String SELF_NODE_NAME = "self"; + public static final String MANAGEMENT_TYPE = "org.amqp.management"; + public static final String GET_TYPES = "GET-TYPES"; + public static final String GET_ATTRIBUTES = "GET-ATTRIBUTES"; + public static final String GET_OPERATIONS = "GET-OPERATIONS"; + public static final String QUERY = "QUERY"; + public static final String ENTITY_TYPES_HEADER = "entityTypes"; + public static final String STATUS_CODE_HEADER = "statusCode"; + public static final int STATUS_CODE_OK = 200; + public static final String ATTRIBUTES_HEADER = "attributes"; + public static final String OFFSET_HEADER = "offset"; + public static final String COUNT_HEADER = "count"; + public static final String MANAGEMENT_NODE_NAME = "$management"; + public static final String CREATE_OPERATION = "CREATE"; + public static final String READ_OPERATION = "READ"; + public static final String UPDATE_OPERATION = "UPDATE"; + public static final String DELETE_OPERATION = "DELETE"; + public static final String STATUS_DESCRIPTION_HEADER = "statusDescription"; + public static final int NOT_FOUND_STATUS_CODE = 404; + public static final int NOT_IMPLEMENTED_STATUS_CODE = 501; + public static final int STATUS_CODE_NO_CONTENT = 204; + public static final int STATUS_CODE_FORBIDDEN = 403; + public static final int STATUS_CODE_BAD_REQUEST = 400; + public static final int STATUS_CODE_INTERNAL_ERROR = 500; + + + private final VirtualHost _virtualHost; + + private final UUID _id; + + private final CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>> _consumerRegistrationListeners = + new CopyOnWriteArrayList<ConsumerRegistrationListener<ManagementNode>>(); + + private final SystemNodeCreator.SystemNodeRegistry _registry; + private final ConfiguredObject _managedObject; + private Map<String, ManagementNodeConsumer> _consumers = new ConcurrentHashMap<String, ManagementNodeConsumer>(); + + private Map<String,ManagedEntityType> _entityTypes = Collections.synchronizedMap(new LinkedHashMap<String, ManagedEntityType>()); + + private Map<ManagedEntityType,Map<String,ConfiguredObject>> _entities = Collections.synchronizedMap(new LinkedHashMap<ManagedEntityType,Map<String,ConfiguredObject>>()); + + + public ManagementNode(final SystemNodeCreator.SystemNodeRegistry registry, + final ConfiguredObject configuredObject) + { + _virtualHost = registry.getVirtualHost(); + _registry = registry; + final String name = configuredObject.getId() + MANAGEMENT_NODE_NAME; + _id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset())); + + + _managedObject = configuredObject; + + populateTypeMetaData(configuredObject.getClass(), false); + + configuredObject.addChangeListener(new ModelObjectListener()); + + final Class managementClass = getManagementClass(_managedObject.getClass()); + _entities.get(_entityTypes.get(managementClass.getName())).put(_managedObject.getName(), _managedObject); + + Collection<Class<? extends ConfiguredObject>> childClasses = Model.getInstance().getChildTypes(managementClass); + for(Class<? extends ConfiguredObject> childClass : childClasses) + { + if(getManagementClass(childClass) != null) + { + for(ConfiguredObject child : _managedObject.getChildren(childClass)) + { + _entities.get(_entityTypes.get(getManagementClass(childClass).getName())).put(child.getName(), child); + } + } + } + + } + + private Class getManagementClass(Class objectClass) + { + List<Class> allClasses = new ArrayList<Class>(); + allClasses.add(objectClass); + allClasses.addAll(Arrays.asList(objectClass.getInterfaces())); + allClasses.add(objectClass.getSuperclass()); + for(Class clazz : allClasses) + { + AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class); + if(annotation != null) + { + return clazz; + } + } + return null; + } + + private boolean populateTypeMetaData(final Class<? extends ConfiguredObject> objectClass, boolean allowCreate) + { + Class clazz = getManagementClass(objectClass); + if( clazz != null) + { + AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class); + populateTypeMetaData(clazz, annotation); + return true; + } + else + { + return false; + } + } + + private ManagedEntityType populateTypeMetaData(Class clazz, + final AmqpManagement entityType) + { + + ManagedEntityType managedEntityType = _entityTypes.get(clazz.getName()); + + if(managedEntityType == null) + { + List<String> opsList = new ArrayList<String>(Arrays.asList(entityType.operations())); + if(entityType.creatable()) + { + boolean isCreatableChild = false; + for(Class<? extends ConfiguredObject> parentConfig : Model.getInstance().getParentTypes(clazz)) + { + isCreatableChild = parentConfig.isAssignableFrom(_managedObject.getClass()); + if(isCreatableChild) + { + opsList.add(CREATE_OPERATION); + break; + } + } + } + opsList.addAll(Arrays.asList(READ_OPERATION, UPDATE_OPERATION, DELETE_OPERATION)); + + Set<ManagedEntityType> parentSet = new HashSet<ManagedEntityType>(); + + List<Class> allClasses = new ArrayList<Class>(Arrays.asList(clazz.getInterfaces())); + if(clazz.getSuperclass() != null) + { + allClasses.add(clazz.getSuperclass()); + } + + for(Class parentClazz : allClasses) + { + if(parentClazz.getAnnotation(AmqpManagement.class) != null) + { + ManagedEntityType parentType = populateTypeMetaData(parentClazz, + (AmqpManagement) parentClazz.getAnnotation( + AmqpManagement.class) + ); + parentSet.add(parentType); + parentSet.addAll(Arrays.asList(parentType.getParents())); + + } + } + managedEntityType = new ManagedEntityType(clazz.getName(), parentSet.toArray(new ManagedEntityType[parentSet.size()]), entityType.attributes(), opsList.toArray(new String[opsList.size()])); + _entityTypes.put(clazz.getName(),managedEntityType); + _entities.put(managedEntityType, Collections.synchronizedMap(new LinkedHashMap<String, ConfiguredObject>())); + + if(ConfiguredObject.class.isAssignableFrom(clazz)) + { + Collection<Class<? extends ConfiguredObject>> childTypes = Model.getInstance().getChildTypes(clazz); + for(Class<? extends ConfiguredObject> childClass : childTypes) + { + populateTypeMetaData(childClass, true); + } + } + + } + + return managedEntityType; + + } + + @Override + public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) + { + + @SuppressWarnings("unchecked") + MessageConverter<M, InternalMessage> converter = + MessageConverterRegistry.getConverter((Class<M>)message.getClass(), InternalMessage.class); + + final InternalMessage msg = converter.<M>convert(message, _virtualHost); + + if(validateMessage(msg)) + { + txn.addPostTransactionAction(new ServerTransaction.Action() + { + @Override + public void postCommit() + { + enqueue(msg, instanceProperties, postEnqueueAction); + } + + @Override + public void onRollback() + { + + } + }); + + return 1; + } + else + { + return 0; + } + } + + private boolean validateMessage(final ServerMessage message) + { + AMQMessageHeader header = message.getMessageHeader(); + return containsStringHeader(header, TYPE_ATTRIBUTE) && containsStringHeader(header, OPERATION_HEADER) + && (containsStringHeader(header, NAME_ATTRIBUTE) || containsStringHeader(header, IDENTITY_ATTRIBUTE)); + } + + private boolean containsStringHeader(final AMQMessageHeader header, String name) + { + return header.containsHeader(name) && header.getHeader(name) instanceof String; + } + + synchronized void enqueue(InternalMessage message, InstanceProperties properties, Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction) + { + if(postEnqueueAction != null) + { + postEnqueueAction.performAction(new ConsumedMessageInstance(message, properties)); + } + + + + String name = (String) message.getMessageHeader().getHeader(NAME_ATTRIBUTE); + String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE); + String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE); + String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER); + + InternalMessage response; + + if(SELF_NODE_NAME.equals(name) && type.equals(MANAGEMENT_TYPE)) + { + response = performManagementOperation(message); + } + else if(CREATE_OPERATION.equals(operation)) + { + response = performCreateOperation(message, type); + } + else + { + + ConfiguredObject entity = findSubject(name, id, type); + + if(entity != null) + { + response = performOperation(message, entity); + } + else + { + if(id != null) + { + response = createFailureResponse(message, + NOT_FOUND_STATUS_CODE, + "No entity with id {0} of type {1} found", id, type); + } + else + { + response = createFailureResponse(message, + NOT_FOUND_STATUS_CODE, + "No entity with name {0} of type {1} found", name, type); + } + } + } + + + ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo()); + if(consumer != null) + { + // TODO - check same owner + consumer.send(response); + } + // TODO - route to a queue + + } + + private InternalMessage performCreateOperation(final InternalMessage message, final String type) + { + InternalMessage response; + ManagedEntityType entityType = _entityTypes.get(type); + if(type != null) + { + if(Arrays.asList(entityType.getOperations()).contains(CREATE_OPERATION)) + { + Object messageBody = message.getMessageBody(); + if(messageBody instanceof Map) + { + try + { + + Class<? extends ConfiguredObject> clazz = + (Class<? extends ConfiguredObject>) Class.forName(type); + try + { + ConfiguredObject child = _managedObject.createChild(clazz, (Map) messageBody); + if(child == null) + { + child = _entities.get(entityType).get(message.getMessageHeader().getHeader(NAME_ATTRIBUTE)); + } + response = performReadOperation(message, child); + } + catch(RuntimeException e) + { + if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException) + { + response = createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage()); + } + else + { + throw e; + } + } + } + catch (ClassNotFoundException e) + { + response = createFailureResponse(message, + STATUS_CODE_INTERNAL_ERROR, "Unable to instantiate an instance of {0} ", type); + } + } + else + { + response = createFailureResponse(message, + STATUS_CODE_BAD_REQUEST, + "The message body in the request was not of the correct type"); + } + } + else + { + response = createFailureResponse(message, + STATUS_CODE_FORBIDDEN, + "Cannot CREATE entities of type {0}", type); + } + } + else + { + response = createFailureResponse(message, + NOT_FOUND_STATUS_CODE, + "Unknown type {0}",type); + } + return response; + } + + private InternalMessage performOperation(final InternalMessage requestMessage, final ConfiguredObject entity) + { + String operation = (String) requestMessage.getMessageHeader().getHeader(OPERATION_HEADER); + + if(READ_OPERATION.equals(operation)) + { + return performReadOperation(requestMessage, entity); + } + else if(DELETE_OPERATION.equals(operation)) + { + return performDeleteOperation(requestMessage, entity); + } + else if(UPDATE_OPERATION.equals(operation)) + { + return performUpdateOperation(requestMessage, entity); + } + else + { + return createFailureResponse(requestMessage, NOT_IMPLEMENTED_STATUS_CODE, "Unable to perform the {0} operation",operation); + } + } + + private InternalMessage performReadOperation(final InternalMessage requestMessage, final ConfiguredObject entity) + { + final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); + final MutableMessageHeader responseHeader = new MutableMessageHeader(); + responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null + ? requestHeader.getMessageId() + : requestHeader.getCorrelationId()); + responseHeader.setMessageId(UUID.randomUUID().toString()); + responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); + responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); + responseHeader.setHeader(STATUS_CODE_HEADER,STATUS_CODE_OK); + final String type = getManagementClass(entity.getClass()).getName(); + responseHeader.setHeader(TYPE_ATTRIBUTE, type); + + Map<String,Object> responseBody = new LinkedHashMap<String, Object>(); + final ManagedEntityType entityType = _entityTypes.get(type); + for(String attribute : entityType.getAttributes()) + { + responseBody.put(attribute, fixValue(entity.getAttribute(attribute))); + } + + return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, responseBody); + } + + + private InternalMessage performDeleteOperation(final InternalMessage requestMessage, final ConfiguredObject entity) + { + final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); + final MutableMessageHeader responseHeader = new MutableMessageHeader(); + responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null + ? requestHeader.getMessageId() + : requestHeader.getCorrelationId()); + responseHeader.setMessageId(UUID.randomUUID().toString()); + responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); + responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); + final String type = getManagementClass(entity.getClass()).getName(); + responseHeader.setHeader(TYPE_ATTRIBUTE, type); + try + { + entity.setDesiredState(entity.getActualState(),State.DELETED); + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT); + } + catch(RuntimeException e) + { + if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException) + { + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_FORBIDDEN); + } + else + { + throw e; + } + + } + + return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, Collections.emptyMap()); + } + + + private InternalMessage performUpdateOperation(final InternalMessage requestMessage, final ConfiguredObject entity) + { + final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); + final MutableMessageHeader responseHeader = new MutableMessageHeader(); + responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null + ? requestHeader.getMessageId() + : requestHeader.getCorrelationId()); + responseHeader.setMessageId(UUID.randomUUID().toString()); + responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); + responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); + final String type = getManagementClass(entity.getClass()).getName(); + responseHeader.setHeader(TYPE_ATTRIBUTE, type); + + Object messageBody = requestMessage.getMessageBody(); + if(messageBody instanceof Map) + { + try + { + entity.setAttributes((Map)messageBody); + return performReadOperation(requestMessage, entity); + } + catch(RuntimeException e) + { + if (e instanceof AccessControlException || e.getCause() instanceof AMQSecurityException) + { + return createFailureResponse(requestMessage, STATUS_CODE_FORBIDDEN, e.getMessage()); + } + else + { + throw e; + } + } + } + else + { + return createFailureResponse(requestMessage, + STATUS_CODE_BAD_REQUEST, + "The message body in the request was not of the correct type"); + } + + + } + + private ConfiguredObject findSubject(final String name, final String id, final String type) + { + ConfiguredObject subject; + ManagedEntityType met = _entityTypes.get(type); + if(met == null) + { + return null; + } + + subject = findSubject(name, id, met); + if(subject == null) + { + ArrayList<ManagedEntityType> allTypes = new ArrayList<ManagedEntityType>(_entityTypes.values()); + for(ManagedEntityType entityType : allTypes) + { + if(Arrays.asList(entityType.getParents()).contains(met)) + { + subject = findSubject(name, id, entityType); + if(subject != null) + { + return subject; + } + } + } + } + return subject; + } + + private ConfiguredObject findSubject(final String name, final String id, final ManagedEntityType entityType) + { + + Map<String, ConfiguredObject> objects = _entities.get(entityType); + if(name != null) + { + ConfiguredObject subject = objects.get(name); + if(subject != null) + { + return subject; + } + } + else + { + final Collection<ConfiguredObject> values = new ArrayList<ConfiguredObject>(objects.values()); + for(ConfiguredObject o : values) + { + if(o.getId().toString().equals(id)) + { + return o; + } + } + } + return null; + } + + private InternalMessage createFailureResponse(final InternalMessage requestMessage, + final int statusCode, + final String stateDescription, + String... params) + { + final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); + final MutableMessageHeader responseHeader = new MutableMessageHeader(); + responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null + ? requestHeader.getMessageId() + : requestHeader.getCorrelationId()); + responseHeader.setMessageId(UUID.randomUUID().toString()); + for(String header : requestHeader.getHeaderNames()) + { + responseHeader.setHeader(header, requestHeader.getHeader(header)); + } + responseHeader.setHeader(STATUS_CODE_HEADER, statusCode); + responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, MessageFormat.format(stateDescription, params)); + return InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), responseHeader, new byte[0]); + + } + + private InternalMessage performManagementOperation(final InternalMessage msg) + { + final InternalMessage responseMessage; + final InternalMessageHeader requestHeader = msg.getMessageHeader(); + final MutableMessageHeader responseHeader = new MutableMessageHeader(); + responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null + ? requestHeader.getMessageId() + : requestHeader.getCorrelationId()); + responseHeader.setMessageId(UUID.randomUUID().toString()); + + + String operation = (String) requestHeader.getHeader(OPERATION_HEADER); + if(GET_TYPES.equals(operation)) + { + responseMessage = performGetTypes(requestHeader, responseHeader); + } + else if(GET_ATTRIBUTES.equals(operation)) + { + responseMessage = performGetAttributes(requestHeader, responseHeader); + } + else if(GET_OPERATIONS.equals(operation)) + { + responseMessage = performGetOperations(requestHeader, responseHeader); + } + else if(QUERY.equals(operation)) + { + responseMessage = performQuery(requestHeader, responseHeader); + } + else + { + responseMessage = InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), requestHeader, new byte[0]); + } + return responseMessage; + } + + private InternalMessage performGetTypes(final InternalMessageHeader requestHeader, + final MutableMessageHeader responseHeader) + { + final InternalMessage responseMessage; + List<String> restriction; + if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) + { + restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER); + } + else + { + restriction = null; + } + + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); + Map<String,Object> responseMap = new LinkedHashMap<String, Object>(); + Map<String,ManagedEntityType> entityMapCopy; + synchronized (_entityTypes) + { + entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes); + } + + for(ManagedEntityType type : entityMapCopy.values()) + { + if(restriction == null || meetsIndirectRestriction(type,restriction)) + { + final ManagedEntityType[] parents = type.getParents(); + List<String> parentNames = new ArrayList<String>(); + if(parents != null) + { + for(ManagedEntityType parent : parents) + { + parentNames.add(parent.getName()); + } + } + responseMap.put(type.getName(), parentNames); + } + } + responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); + return responseMessage; + } + + private InternalMessage performGetAttributes(final InternalMessageHeader requestHeader, + final MutableMessageHeader responseHeader) + { + final InternalMessage responseMessage; + List<String> restriction; + if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) + { + restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER); + } + else + { + restriction = null; + } + + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); + Map<String,Object> responseMap = new LinkedHashMap<String, Object>(); + Map<String,ManagedEntityType> entityMapCopy; + synchronized (_entityTypes) + { + entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes); + } + + for(ManagedEntityType type : entityMapCopy.values()) + { + if(restriction == null || restriction.contains(type.getName())) + { + responseMap.put(type.getName(), Arrays.asList(type.getAttributes())); + } + } + responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); + return responseMessage; + } + + + private InternalMessage performGetOperations(final InternalMessageHeader requestHeader, + final MutableMessageHeader responseHeader) + { + final InternalMessage responseMessage; + List<String> restriction; + if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) + { + restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER); + } + else + { + restriction = null; + } + + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); + Map<String,Object> responseMap = new LinkedHashMap<String, Object>(); + Map<String,ManagedEntityType> entityMapCopy; + synchronized (_entityTypes) + { + entityMapCopy = new LinkedHashMap<String, ManagedEntityType>(_entityTypes); + } + + for(ManagedEntityType type : entityMapCopy.values()) + { + if(restriction == null || restriction.contains(type.getName())) + { + responseMap.put(type.getName(), Arrays.asList(type.getOperations())); + } + } + responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); + return responseMessage; + } + + private InternalMessage performQuery(final InternalMessageHeader requestHeader, + final MutableMessageHeader responseHeader) + { + final InternalMessage responseMessage; + List<String> restriction; + List<String> attributes; + int offset; + int count; + + if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) + { + restriction = (List<String>) requestHeader.getHeader(ENTITY_TYPES_HEADER); + responseHeader.setHeader(ENTITY_TYPES_HEADER, restriction); + } + else + { + restriction = new ArrayList<String>(_entityTypes.keySet()); + } + + + if(requestHeader.containsHeader(ATTRIBUTES_HEADER)) + { + attributes = (List<String>) requestHeader.getHeader(ATTRIBUTES_HEADER); + } + else + { + LinkedHashMap<String,Void> attributeSet = new LinkedHashMap<String, Void>(); + for(String entityType : restriction) + { + ManagedEntityType type = _entityTypes.get(entityType); + if(type != null) + { + for(String attributeName : type.getAttributes()) + { + attributeSet.put(attributeName, null); + } + } + } + attributes = new ArrayList<String>(attributeSet.keySet()); + + } + + if(requestHeader.containsHeader(OFFSET_HEADER)) + { + offset = ((Number) requestHeader.getHeader(OFFSET_HEADER)).intValue(); + responseHeader.setHeader(OFFSET_HEADER,offset); + } + else + { + offset = 0; + } + + if(requestHeader.containsHeader(COUNT_HEADER)) + { + count = ((Number) requestHeader.getHeader(COUNT_HEADER)).intValue(); + } + else + { + count = Integer.MAX_VALUE; + } + + responseHeader.setHeader(ATTRIBUTES_HEADER, attributes); + + responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); + List<List<Object>> responseList = new ArrayList<List<Object>>(); + + int rowNo = 0; + for(String type : restriction) + { + ManagedEntityType entityType = _entityTypes.get(type); + if(entityType != null) + { + Map<String, ConfiguredObject> entityMap = _entities.get(entityType); + if(entityMap != null) + { + List<ConfiguredObject> entities; + synchronized(entityMap) + { + entities = new ArrayList<ConfiguredObject>(entityMap.values()); + } + for(ConfiguredObject entity : entities) + { + if(rowNo++ >= offset) + { + Object[] attrValue = new Object[attributes.size()]; + int col = 0; + for(String attr : attributes) + { + Object value; + if(TYPE_ATTRIBUTE.equals(attr)) + { + value = entityType.getName(); + } + else + { + value = fixValue(entity.getAttribute(attr)); + } + attrValue[col++] = value; + } + responseList.add(Arrays.asList(attrValue)); + } + if(responseList.size()==count) + { + break; + } + } + } + } + + if(responseList.size()==count) + { + break; + } + } + responseHeader.setHeader(COUNT_HEADER, count); + responseMessage = InternalMessage.createListMessage(_virtualHost.getMessageStore(), + responseHeader, + responseList); + return responseMessage; + } + + private Object fixValue(final Object value) + { + Object fixedValue; + if(value instanceof Enum) + { + fixedValue = value.toString(); + } + else if(value instanceof Map) + { + Map<Object, Object> oldValue = (Map<Object, Object>) value; + Map<Object, Object> newValue = new LinkedHashMap<Object, Object>(); + for(Map.Entry<Object, Object> entry : oldValue.entrySet()) + { + newValue.put(fixValue(entry.getKey()),fixValue(entry.getValue())); + } + fixedValue = newValue; + } + else if(value instanceof Collection) + { + Collection oldValue = (Collection) value; + List newValue = new ArrayList(oldValue.size()); + for(Object o : oldValue) + { + newValue.add(fixValue(o)); + } + fixedValue = newValue; + } + else if(value != null && value.getClass().isArray() && !(value instanceof byte[])) + { + fixedValue = fixValue(Arrays.asList((Object[])value)); + } + else + { + fixedValue = value; + } + return fixedValue; + + } + + + private boolean meetsIndirectRestriction(final ManagedEntityType type, final List<String> restriction) + { + if(restriction.contains(type.getName())) + { + return true; + } + if(type.getParents() != null) + { + for(ManagedEntityType parent : type.getParents()) + { + if(meetsIndirectRestriction(parent, restriction)) + { + return true; + } + } + } + return false; + } + + @Override + public synchronized <T extends ConsumerTarget> ManagementNodeConsumer addConsumer(final T target, + final FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + final EnumSet<Consumer.Option> options) throws AMQException + { + + final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); + target.consumerAdded(managementNodeConsumer); + _consumers.put(consumerName, managementNodeConsumer); + for(ConsumerRegistrationListener<ManagementNode> listener : _consumerRegistrationListeners) + { + listener.consumerAdded(this, managementNodeConsumer); + } + return managementNodeConsumer; + } + + @Override + public synchronized Collection<ManagementNodeConsumer> getConsumers() + { + return new ArrayList<ManagementNodeConsumer>(_consumers.values()); + } + + @Override + public void addConsumerRegistrationListener(final ConsumerRegistrationListener<ManagementNode> listener) + { + _consumerRegistrationListeners.add(listener); + } + + @Override + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) + { + _consumerRegistrationListeners.remove(listener); + } + + @Override + public AuthorizationHolder getAuthorizationHolder() + { + return null; + } + + @Override + public void setAuthorizationHolder(final AuthorizationHolder principalHolder) + { + + } + + @Override + public void setExclusiveOwningSession(final AMQSessionModel owner) + { + + } + + @Override + public AMQSessionModel getExclusiveOwningSession() + { + return null; + } + + @Override + public boolean isExclusive() + { + return false; + } + + @Override + public String getName() + { + return MANAGEMENT_NODE_NAME; + } + + @Override + public UUID getId() + { + return _id; + } + + @Override + public boolean isDurable() + { + return false; + } + + private class ConsumedMessageInstance implements MessageInstance<ConsumedMessageInstance,Consumer> + { + private final ServerMessage _message; + private final InstanceProperties _properties; + + public ConsumedMessageInstance(final ServerMessage message, + final InstanceProperties properties) + { + _message = message; + _properties = properties; + } + + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + + } + + @Override + public void decrementDeliveryCount() + { + + } + + @Override + public void addStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener) + { + + } + + @Override + public boolean removeStateChangeListener(final StateChangeListener<? super ConsumedMessageInstance, State> listener) + { + return false; + } + + + @Override + public boolean acquiredByConsumer() + { + return false; + } + + @Override + public boolean isAcquiredBy(final Consumer consumer) + { + return false; + } + + @Override + public void setRedelivered() + { + + } + + @Override + public boolean isRedelivered() + { + return false; + } + + @Override + public Consumer getDeliveredConsumer() + { + return null; + } + + @Override + public void reject() + { + + } + + @Override + public boolean isRejectedBy(final Consumer consumer) + { + return false; + } + + @Override + public boolean getDeliveredToConsumer() + { + return true; + } + + @Override + public boolean expired() throws AMQException + { + return false; + } + + @Override + public boolean acquire(final Consumer sub) + { + return false; + } + + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + + @Override + public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, + final ServerTransaction txn) + { + return 0; + } + + + @Override + public Filterable asFilterable() + { + return null; + } + + @Override + public boolean isAvailable() + { + return false; + } + + @Override + public boolean acquire() + { + return false; + } + + @Override + public boolean isAcquired() + { + return false; + } + + @Override + public void release() + { + + } + + @Override + public boolean resend() throws AMQException + { + return false; + } + + @Override + public void delete() + { + + } + + @Override + public boolean isDeleted() + { + return false; + } + + @Override + public ServerMessage getMessage() + { + return _message; + } + + @Override + public InstanceProperties getInstanceProperties() + { + return _properties; + } + + @Override + public TransactionLogResource getOwningResource() + { + return ManagementNode.this; + } + } + + private class ModelObjectListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + _registry.removeSystemNode(ManagementNode.this); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName()); + if(entityType != null) + { + _entities.get(entityType).put(child.getName(), child); + } + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName()); + if(entityType != null) + { + _entities.get(entityType).remove(child.getName()); + } + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } + + private static class MutableMessageHeader implements AMQMessageHeader + { + private final LinkedHashMap<String, Object> _headers = new LinkedHashMap<String, Object>(); + private String _correlationId; + private long _expiration; + private String _userId; + private String _appId; + private String _messageId; + private String _mimeType; + private String _encoding; + private byte _priority; + private long _timestamp; + private String _type; + private String _replyTo; + + public void setCorrelationId(final String correlationId) + { + _correlationId = correlationId; + } + + public void setExpiration(final long expiration) + { + _expiration = expiration; + } + + public void setUserId(final String userId) + { + _userId = userId; + } + + public void setAppId(final String appId) + { + _appId = appId; + } + + public void setMessageId(final String messageId) + { + _messageId = messageId; + } + + public void setMimeType(final String mimeType) + { + _mimeType = mimeType; + } + + public void setEncoding(final String encoding) + { + _encoding = encoding; + } + + public void setPriority(final byte priority) + { + _priority = priority; + } + + public void setTimestamp(final long timestamp) + { + _timestamp = timestamp; + } + + public void setType(final String type) + { + _type = type; + } + + public void setReplyTo(final String replyTo) + { + _replyTo = replyTo; + } + + public String getCorrelationId() + { + return _correlationId; + } + + public long getExpiration() + { + return _expiration; + } + + public String getUserId() + { + return _userId; + } + + public String getAppId() + { + return _appId; + } + + public String getMessageId() + { + return _messageId; + } + + public String getMimeType() + { + return _mimeType; + } + + public String getEncoding() + { + return _encoding; + } + + public byte getPriority() + { + return _priority; + } + + public long getTimestamp() + { + return _timestamp; + } + + public String getType() + { + return _type; + } + + public String getReplyTo() + { + return _replyTo; + } + + @Override + public Object getHeader(final String name) + { + return _headers.get(name); + } + + @Override + public boolean containsHeaders(final Set<String> names) + { + return _headers.keySet().containsAll(names); + } + + @Override + public boolean containsHeader(final String name) + { + return _headers.containsKey(name); + } + + @Override + public Collection<String> getHeaderNames() + { + return Collections.unmodifiableCollection(_headers.keySet()); + } + + public void setHeader(String header, Object value) + { + _headers.put(header,value); + } + + } +} diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java new file mode 100644 index 0000000000..98bb20d364 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -0,0 +1,242 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.util.StateChangeListener; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +class ManagementNodeConsumer implements Consumer +{ + private final long _id = Consumer.SUB_ID_GENERATOR.getAndIncrement(); + private final ManagementNode _managementNode; + private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); + private final ConsumerTarget _target; + private final Lock _stateChangeLock = new ReentrantLock(); + private final String _name; + private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> _targetChangeListener = new TargetChangeListener(); + + + public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target) + { + _name = consumerName; + _managementNode = managementNode; + _target = target; + target.setStateListener(_targetChangeListener); + } + + @Override + public void externalStateChange() + { + + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public long getUnacknowledgedBytes() + { + return 0; + } + + @Override + public long getUnacknowledgedMessages() + { + return 0; + } + + @Override + public AMQSessionModel getSessionModel() + { + return _target.getSessionModel(); + } + + @Override + public void setNoLocal(final boolean noLocal) + { + } + + @Override + public long getId() + { + return _id; + } + + @Override + public boolean isSuspended() + { + return false; + } + + @Override + public boolean isClosed() + { + return false; + } + + @Override + public boolean acquires() + { + return true; + } + + @Override + public boolean seesRequeues() + { + return false; + } + + @Override + public void close() throws AMQException + { + + } + + @Override + public boolean trySendLock() + { + return _stateChangeLock.tryLock(); + } + + @Override + public void getSendLock() + { + _stateChangeLock.lock(); + } + + @Override + public void releaseSendLock() + { + _stateChangeLock.unlock(); + } + + + @Override + public boolean isActive() + { + return false; + } + + @Override + public String getName() + { + return _name; + } + + @Override + public void flush() throws AMQException + { + + } + + ManagementNode getManagementNode() + { + return _managementNode; + } + + void send(final InternalMessage response) + { + getSendLock(); + try + { + final ManagementResponse responseEntry = new ManagementResponse(this, response); + if(_queue.isEmpty() && _target.allocateCredit(response)) + { + _target.send(responseEntry,false); + } + else + { + _queue.add(responseEntry); + } + } + catch (AMQException e) + { + e.printStackTrace(); + } + finally + { + releaseSendLock(); + } + } + + private class TargetChangeListener implements StateChangeListener<ConsumerTarget, ConsumerTarget.State> + { + @Override + public void stateChanged(final ConsumerTarget object, + final ConsumerTarget.State oldState, + final ConsumerTarget.State newState) + { + if(newState == ConsumerTarget.State.ACTIVE) + { + deliverMessages(); + } + } + } + + private void deliverMessages() + { + getSendLock(); + try + { + while(!_queue.isEmpty()) + { + + final ManagementResponse managementResponse = _queue.get(0); + if(!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage())) + { + _queue.remove(0); + _target.send(managementResponse,false); + } + else + { + break; + } + } + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + finally + { + releaseSendLock(); + } + } +} diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java new file mode 100644 index 0000000000..cdb71f4859 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeCreator.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.amqp; + +import org.apache.qpid.server.plugin.SystemNodeCreator; + +public class ManagementNodeCreator implements SystemNodeCreator +{ + @Override + public void register(final SystemNodeRegistry registry) + { + ManagementNode managementNode = new ManagementNode(registry,registry.getVirtualHostModel()); + registry.registerSystemNode(managementNode); + } + + @Override + public String getType() + { + return "AMQP-VIRTUALHOST-MANAGEMENT"; + } +} diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java new file mode 100644 index 0000000000..e25f327420 --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -0,0 +1,220 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.management.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.StateChangeListener; + +class ManagementResponse implements MessageInstance<ManagementResponse,ManagementNodeConsumer> +{ + private final ManagementNodeConsumer _consumer; + private int _deliveryCount; + private boolean _isRedelivered; + private boolean _isDelivered; + private boolean _isDeleted; + private InternalMessage _message; + + ManagementResponse(final ManagementNodeConsumer consumer, final InternalMessage message) + { + _consumer = consumer; + _message = message; + } + + @Override + public int getDeliveryCount() + { + return 0; + } + + @Override + public void incrementDeliveryCount() + { + _deliveryCount++; + } + + @Override + public void decrementDeliveryCount() + { + _deliveryCount--; + } + + @Override + public void addStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener) + { + + } + + @Override + public boolean removeStateChangeListener(final StateChangeListener<? super ManagementResponse, State> listener) + { + return false; + } + + + @Override + public boolean acquiredByConsumer() + { + return !isDeleted(); + } + + @Override + public boolean isAcquiredBy(final ManagementNodeConsumer consumer) + { + return consumer == _consumer && !isDeleted(); + } + + @Override + public void setRedelivered() + { + _isRedelivered = true; + } + + @Override + public boolean isRedelivered() + { + return _isRedelivered; + } + + @Override + public ManagementNodeConsumer getDeliveredConsumer() + { + return isDeleted() ? null : _consumer; + } + + @Override + public void reject() + { + delete(); + } + + @Override + public boolean isRejectedBy(final ManagementNodeConsumer consumer) + { + return false; + } + + @Override + public boolean getDeliveredToConsumer() + { + return _isDelivered; + } + + @Override + public boolean expired() throws AMQException + { + return false; + } + + @Override + public boolean acquire(final ManagementNodeConsumer sub) + { + return false; + } + + @Override + public int getMaximumDeliveryCount() + { + return 0; + } + + @Override + public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, + final ServerTransaction txn) + { + return 0; + } + + + @Override + public Filterable asFilterable() + { + return null; + } + + @Override + public boolean isAvailable() + { + return false; + } + + @Override + public boolean acquire() + { + return false; + } + + @Override + public boolean isAcquired() + { + return !isDeleted(); + } + + @Override + public void release() + { + delete(); + } + + @Override + public boolean resend() throws AMQException + { + return false; + } + + @Override + public void delete() + { + // TODO + } + + @Override + public boolean isDeleted() + { + return _isDeleted; + } + + @Override + public ServerMessage getMessage() + { + return _message; + } + + @Override + public InstanceProperties getInstanceProperties() + { + return InstanceProperties.EMPTY; + } + + @Override + public TransactionLogResource getOwningResource() + { + return _consumer.getManagementNode(); + } +} diff --git a/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator b/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator new file mode 100644 index 0000000000..767f93782b --- /dev/null +++ b/qpid/java/broker-plugins/management-amqp/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.SystemNodeCreator @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.management.amqp.ManagementNodeCreator diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java index 4d85d52997..0329379713 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageContentServlet.java @@ -19,9 +19,6 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html index 9a24e23407..410a5d23ca 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html @@ -35,16 +35,16 @@ <tr> <td valign="top"><strong>Queue Type: </strong></td> <td> - <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeStandard" name="queueType" value="standard" checked="checked" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeStandard">Standard</label> - <input type="radio" id="formAddQueueTypePriority" name="type" value="priority" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypePriority" name="queueType" value="priority" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypePriority">Priority</label> - <input type="radio" id="formAddQueueTypeLVQ" name="type" value="lvq" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeLVQ" name="queueType" value="lvq" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeLVQ">LVQ</label> - <input type="radio" id="formAddQueueTypeSorted" name="type" value="sorted" dojoType="dijit.form.RadioButton" /> + <input type="radio" id="formAddQueueTypeSorted" name="queueType" value="sorted" dojoType="dijit.form.RadioButton" /> <label for="formAddQueueTypeSorted">Sorted</label> </td> </tr> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js index 54cfe225c8..606f95e349 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js @@ -277,7 +277,7 @@ define(["dojo/_base/xhr", "exclusive", "owner", "lifetimePolicy", - "type", + "queueType", "typeQualifier", "alertRepeatGap", "alertRepeatGapUnits", @@ -359,14 +359,14 @@ define(["dojo/_base/xhr", bytesDepth = formatter.formatBytes( this.queueData["unacknowledgedBytes"] ); this.unacknowledgedBytes.innerHTML = "(" + bytesDepth.value; this.unacknowledgedBytesUnits.innerHTML = bytesDepth.units + ")"; - this.type.innerHTML = entities.encode(this.queueData[ "type" ]); - if (this.queueData.type == "standard") + this.queueType.innerHTML = entities.encode(this.queueData[ "queueType" ]); + if (this.queueData.queueType == "standard") { this.typeQualifier.style.display = "none"; } else { - this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.type] + ": " + this.queueData[queueTypeKeys[this.queueData.type]] + ")"); + this.typeQualifier.innerHTML = entities.encode("(" + queueTypeKeyNames[this.queueData.queueType] + ": " + this.queueData[queueTypeKeys[this.queueData.queueType]] + ")"); } if(this.queueData["messageGroupKey"]) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js index e8b8dd1721..64e821b2dd 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js @@ -96,7 +96,7 @@ define(["dojo/_base/xhr", } } else if (!typeSpecificFields.hasOwnProperty(propName) || - formValues.type === typeSpecificFields[ propName ]) { + formValues.queueType === typeSpecificFields[ propName ]) { if(formValues[ propName ] !== "") { if (fieldConverters.hasOwnProperty(propName)) { @@ -130,7 +130,7 @@ define(["dojo/_base/xhr", theForm = registry.byId("formAddQueue"); array.forEach(theForm.getDescendants(), function(widget) { - if(widget.name === "type") { + if(widget.name === "queueType") { widget.on("change", function(isChecked) { var objId = widget.id + ":fields"; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html index 89b7327957..252deb3100 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html @@ -47,7 +47,7 @@ <div style="clear:both"> <div class="formLabel-labelCell" style="float:left; width: 150px;">Type:</div> <div style="float:left;"> - <span class="type"></span> + <span class="queueType"></span> <span class="typeQualifier"></span> </div> </div> diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 49cb6aa8cb..d74aa41244 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -183,7 +183,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN @Override public String getQueueType() { - return (String) _queue.getAttribute(Queue.TYPE); + return (String) _queue.getAttribute(Queue.QUEUE_TYPE); } public boolean isDurable() diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index 3711a90f3b..f94c206512 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -127,7 +127,7 @@ public class QueueMBeanTest extends QpidTestCase public void testQueueType() throws Exception { - assertAttribute("queueType", QUEUE_TYPE, Queue.TYPE); + assertAttribute("queueType", QUEUE_TYPE, Queue.QUEUE_TYPE); } public void testMaximumDeliveryCount() throws Exception |
