summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java586
1 files changed, 0 insertions, 586 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
deleted file mode 100644
index 27ab580642..0000000000
--- a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.qmf;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ExchangeConfigType;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.exchange.ExchangeType;
-import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
-import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
-import org.apache.qpid.server.exchange.topic.TopicNormalizer;
-import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.virtualhost.HouseKeepingTask;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ManagementExchange implements Exchange, QMFService.Listener
-{
- private static final AMQShortString QPID_MANAGEMENT = new AMQShortString("qpid.management");
- private static final AMQShortString QPID_MANAGEMENT_TYPE = new AMQShortString("management");
-
- private VirtualHost _virtualHost;
-
- private final TopicParser _parser = new TopicParser();
-
- private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
- new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
-
- private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
- private UUID _id;
- private UUID _qmfId;
- private static final String AGENT_BANK = "0";
-
- private int _bindingCountHigh;
- private final AtomicLong _msgReceived = new AtomicLong();
- private final AtomicLong _bytesReceived = new AtomicLong();
-
- private final CopyOnWriteArrayList<BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>();
-
- //TODO : persist creation time
- private long _createTime = System.currentTimeMillis();
-
-
- private class ManagementQueue implements BaseQueue
- {
- private final UUID QUEUE_ID = UUIDGenerator.generateRandomUUID();
- private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString();
- private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
-
- public void enqueue(ServerMessage message) throws AMQException
- {
- long size = message.getSize();
-
- ByteBuffer buf = ByteBuffer.allocate((int) size);
-
- int offset = 0;
-
- while(offset < size)
- {
- offset += message.getContent(buf,offset);
- }
-
- buf.flip();
- QMFCommandDecoder commandDecoder = new QMFCommandDecoder(getQMFService(),buf);
- QMFCommand cmd;
- while((cmd = commandDecoder.decode()) != null)
- {
- cmd.process(_virtualHost, message);
- }
-
- }
-
- public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
- {
- enqueue(message);
- }
-
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
- {
- enqueue(message);
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public AMQShortString getNameShortString()
- {
- return NAME_AS_SHORT_STRING;
- }
-
- @Override
- public UUID getId()
- {
- return QUEUE_ID;
- }
- }
-
-
- private final ManagementQueue _mgmtQueue = new ManagementQueue();
-
- public ManagementExchange()
- {
- }
-
- public static final ExchangeType<ManagementExchange> TYPE = new ExchangeType<ManagementExchange>()
- {
-
- public AMQShortString getName()
- {
- return QPID_MANAGEMENT_TYPE;
- }
-
- public Class<ManagementExchange> getExchangeClass()
- {
- return ManagementExchange.class;
- }
-
- public ManagementExchange newInstance(UUID id, VirtualHost host,
- AMQShortString name,
- boolean durable,
- int ticket,
- boolean autoDelete) throws AMQException
- {
- ManagementExchange exch = new ManagementExchange();
- exch.initialise(id, host, name, durable, ticket, autoDelete);
- return exch;
- }
-
- public AMQShortString getDefaultExchangeName()
- {
- return QPID_MANAGEMENT;
- }
- };
-
-
- public AMQShortString getNameShortString()
- {
- return QPID_MANAGEMENT;
- }
-
- public AMQShortString getTypeShortString()
- {
- return QPID_MANAGEMENT_TYPE;
- }
-
- public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
- throws AMQException
- {
- if(!QPID_MANAGEMENT.equals(name))
- {
- throw new AMQException("Can't create more than one Management exchange");
- }
- _virtualHost = host;
- _id = id;
- _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
- _qmfId = getConfigStore().createId();
- getConfigStore().addConfiguredObject(this);
- getQMFService().addListener(this);
- }
-
- public UUID getId()
- {
- return _id;
- }
-
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public ExchangeConfigType getConfigType()
- {
- return ExchangeConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return _virtualHost;
- }
-
- public boolean isDurable()
- {
- return true;
- }
-
- public VirtualHost getVirtualHost()
- {
- return _virtualHost;
- }
-
- public String getName()
- {
- return QPID_MANAGEMENT.toString();
- }
-
- public ExchangeType getType()
- {
- return TYPE;
- }
-
- public boolean isAutoDelete()
- {
- return false;
- }
-
- public int getTicket()
- {
- return 0;
- }
-
- public void close() throws AMQException
- {
- getConfigStore().removeConfiguredObject(this);
- }
-
- public ConfigStore getConfigStore()
- {
- return getVirtualHost().getConfigStore();
- }
-
- public synchronized void addBinding(final Binding b)
- {
-
- if(_bindingSet.add(b))
- {
- AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey()));
-
- TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(result == null)
- {
- result = new TopicExchangeResult();
- result.addUnfilteredQueue(b.getQueue());
- _parser.addBinding(routingKey, result);
- _topicExchangeResults.put(routingKey,result);
- }
- else
- {
- result.addUnfilteredQueue(b.getQueue());
- }
-
- result.addBinding(b);
- }
-
- for(BindingListener listener : _listeners)
- {
- listener.bindingAdded(this, b);
- }
-
- if(_bindingSet.size() > _bindingCountHigh)
- {
- _bindingCountHigh = _bindingSet.size();
- }
-
- String bindingKey = b.getBindingKey();
-
- if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
- {
- publishAllSchema();
- }
- if(bindingKey.startsWith("console.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#."))
- {
- publishAllConsole();
- }
-
- }
-
- void publishAllConsole()
- {
- QMFService qmfService = getQMFService();
-
- long sampleTime = System.currentTimeMillis();
-
- for(QMFPackage pkg : qmfService.getSupportedSchemas())
- {
- for(QMFClass qmfClass : pkg.getClasses())
- {
- Collection<QMFObject> qmfObjects = qmfService.getObjects(qmfClass);
-
- publishObjectsToConsole(sampleTime, qmfObjects);
- }
-
- }
-
- }
-
- private QMFService getQMFService()
- {
- return _virtualHost.getApplicationRegistry().getQMFService();
- }
-
- void publishObjectsToConsole(final long sampleTime,
- final Collection<QMFObject> qmfObjects)
- {
- if(!qmfObjects.isEmpty() && hasBindings())
- {
- QMFClass qmfClass = qmfObjects.iterator().next().getQMFClass();
- ArrayList<QMFCommand> commands = new ArrayList<QMFCommand>();
-
-
- for(QMFObject obj : qmfObjects)
- {
- commands.add(obj.asConfigInfoCmd(sampleTime));
- commands.add(obj.asInstrumentInfoCmd(sampleTime));
- }
-
- publishToConsole(qmfClass, commands);
- }
- }
-
- private void publishToConsole(final QMFClass qmfClass, final ArrayList<QMFCommand> commands)
- {
- if(!commands.isEmpty() && hasBindings())
- {
- String routingKey = "console.obj.1." + AGENT_BANK + "." + qmfClass.getPackage().getName() + "." + qmfClass.getName();
- QMFMessage message = new QMFMessage(routingKey,commands.toArray(new QMFCommand[commands.size()]));
-
- Collection<TopicMatcherResult> results = _parser.parse(new AMQShortString(routingKey));
- HashSet<AMQQueue> queues = new HashSet<AMQQueue>();
- for(TopicMatcherResult result : results)
- {
- TopicExchangeResult res = (TopicExchangeResult)result;
-
- for(Binding b : res.getBindings())
- {
- b.incrementMatches();
- }
-
- queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues());
- }
- for(AMQQueue queue : queues)
- {
- try
- {
- queue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- void publishAllSchema()
- {
-
- }
-
- public synchronized void removeBinding(final Binding binding)
- {
- if(_bindingSet.remove(binding))
- {
- AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
- TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
- result.removeBinding(binding);
- result.removeUnfilteredQueue(binding.getQueue());
- }
-
- for(BindingListener listener : _listeners)
- {
- listener.bindingRemoved(this, binding);
- }
- }
-
- public synchronized Collection<Binding> getBindings()
- {
- return new ArrayList<Binding>(_bindingSet);
- }
-
- public ArrayList<BaseQueue> route(InboundMessage message)
- {
- ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(1);
- _msgReceived.incrementAndGet();
- _bytesReceived.addAndGet(message.getSize());
- queues.add(_mgmtQueue);
- return queues;
- }
-
- public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
- {
- return false; //TODO
- }
-
- public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isBound(AMQShortString routingKey, AMQQueue queue)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isBound(AMQShortString routingKey)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isBound(AMQQueue queue)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean hasBindings()
- {
- return !_bindingSet.isEmpty();
- }
-
- public boolean isBound(String bindingKey, AMQQueue queue)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isBound(String bindingKey)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void addCloseTask(final Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void removeCloseTask(final Task task)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
-
- public Exchange getAlternateExchange()
- {
- return null;
- }
-
- public Map<String, Object> getArguments()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setAlternateExchange(Exchange exchange)
- {
-
- }
-
- public void removeReference(ExchangeReferrer exchange)
- {
- }
-
- public void addReference(ExchangeReferrer exchange)
- {
- }
-
- public boolean hasReferrers()
- {
- return true;
- }
-
-
-
- private class UpdateTask extends HouseKeepingTask
- {
- public UpdateTask(VirtualHost vhost)
- {
- super(vhost);
- }
-
- public void execute()
- {
- publishAllConsole();
- publishAllSchema();
- }
-
- }
-
- public void objectCreated(final QMFObject obj)
- {
- publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
- }
-
- public void objectDeleted(final QMFObject obj)
- {
- publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj));
- }
-
- public long getBindingCount()
- {
- return getBindings().size();
- }
-
- public long getBindingCountHigh()
- {
- return _bindingCountHigh;
- }
-
- public long getMsgReceives()
- {
- return _msgReceived.get();
- }
-
- public long getMsgRoutes()
- {
- return getMsgReceives();
- }
-
- public long getMsgDrops()
- {
- return 0l;
- }
-
- public long getByteReceives()
- {
- return _bytesReceived.get();
- }
-
- public long getByteRoutes()
- {
- return getByteReceives();
- }
-
- public long getByteDrops()
- {
- return 0l;
- }
-
- public long getCreateTime()
- {
- return _createTime;
- }
-
- public void addBindingListener(final BindingListener listener)
- {
- _listeners.add(listener);
- }
-
- public void removeBindingListener(final BindingListener listener)
- {
- _listeners.remove(listener);
- }
-
-}