diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java | 586 |
1 files changed, 0 insertions, 586 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java deleted file mode 100644 index 27ab580642..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ /dev/null @@ -1,586 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.qmf; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ExchangeConfigType; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.ExchangeReferrer; -import org.apache.qpid.server.exchange.ExchangeType; -import org.apache.qpid.server.exchange.topic.TopicExchangeResult; -import org.apache.qpid.server.exchange.topic.TopicMatcherResult; -import org.apache.qpid.server.exchange.topic.TopicNormalizer; -import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.virtualhost.HouseKeepingTask; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicLong; - -public class ManagementExchange implements Exchange, QMFService.Listener -{ - private static final AMQShortString QPID_MANAGEMENT = new AMQShortString("qpid.management"); - private static final AMQShortString QPID_MANAGEMENT_TYPE = new AMQShortString("management"); - - private VirtualHost _virtualHost; - - private final TopicParser _parser = new TopicParser(); - - private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults = - new ConcurrentHashMap<AMQShortString, TopicExchangeResult>(); - - private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>(); - private UUID _id; - private UUID _qmfId; - private static final String AGENT_BANK = "0"; - - private int _bindingCountHigh; - private final AtomicLong _msgReceived = new AtomicLong(); - private final AtomicLong _bytesReceived = new AtomicLong(); - - private final CopyOnWriteArrayList<BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>(); - - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); - - - private class ManagementQueue implements BaseQueue - { - private final UUID QUEUE_ID = UUIDGenerator.generateRandomUUID(); - private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString(); - private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING); - - public void enqueue(ServerMessage message) throws AMQException - { - long size = message.getSize(); - - ByteBuffer buf = ByteBuffer.allocate((int) size); - - int offset = 0; - - while(offset < size) - { - offset += message.getContent(buf,offset); - } - - buf.flip(); - QMFCommandDecoder commandDecoder = new QMFCommandDecoder(getQMFService(),buf); - QMFCommand cmd; - while((cmd = commandDecoder.decode()) != null) - { - cmd.process(_virtualHost, message); - } - - } - - public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException - { - enqueue(message); - } - - public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException - { - enqueue(message); - } - - public boolean isDurable() - { - return false; - } - - public AMQShortString getNameShortString() - { - return NAME_AS_SHORT_STRING; - } - - @Override - public UUID getId() - { - return QUEUE_ID; - } - } - - - private final ManagementQueue _mgmtQueue = new ManagementQueue(); - - public ManagementExchange() - { - } - - public static final ExchangeType<ManagementExchange> TYPE = new ExchangeType<ManagementExchange>() - { - - public AMQShortString getName() - { - return QPID_MANAGEMENT_TYPE; - } - - public Class<ManagementExchange> getExchangeClass() - { - return ManagementExchange.class; - } - - public ManagementExchange newInstance(UUID id, VirtualHost host, - AMQShortString name, - boolean durable, - int ticket, - boolean autoDelete) throws AMQException - { - ManagementExchange exch = new ManagementExchange(); - exch.initialise(id, host, name, durable, ticket, autoDelete); - return exch; - } - - public AMQShortString getDefaultExchangeName() - { - return QPID_MANAGEMENT; - } - }; - - - public AMQShortString getNameShortString() - { - return QPID_MANAGEMENT; - } - - public AMQShortString getTypeShortString() - { - return QPID_MANAGEMENT_TYPE; - } - - public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) - throws AMQException - { - if(!QPID_MANAGEMENT.equals(name)) - { - throw new AMQException("Can't create more than one Management exchange"); - } - _virtualHost = host; - _id = id; - _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost)); - _qmfId = getConfigStore().createId(); - getConfigStore().addConfiguredObject(this); - getQMFService().addListener(this); - } - - public UUID getId() - { - return _id; - } - - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public ExchangeConfigType getConfigType() - { - return ExchangeConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return _virtualHost; - } - - public boolean isDurable() - { - return true; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public String getName() - { - return QPID_MANAGEMENT.toString(); - } - - public ExchangeType getType() - { - return TYPE; - } - - public boolean isAutoDelete() - { - return false; - } - - public int getTicket() - { - return 0; - } - - public void close() throws AMQException - { - getConfigStore().removeConfiguredObject(this); - } - - public ConfigStore getConfigStore() - { - return getVirtualHost().getConfigStore(); - } - - public synchronized void addBinding(final Binding b) - { - - if(_bindingSet.add(b)) - { - AMQShortString routingKey = TopicNormalizer.normalize(new AMQShortString(b.getBindingKey())); - - TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(result == null) - { - result = new TopicExchangeResult(); - result.addUnfilteredQueue(b.getQueue()); - _parser.addBinding(routingKey, result); - _topicExchangeResults.put(routingKey,result); - } - else - { - result.addUnfilteredQueue(b.getQueue()); - } - - result.addBinding(b); - } - - for(BindingListener listener : _listeners) - { - listener.bindingAdded(this, b); - } - - if(_bindingSet.size() > _bindingCountHigh) - { - _bindingCountHigh = _bindingSet.size(); - } - - String bindingKey = b.getBindingKey(); - - if(bindingKey.startsWith("schema.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#.")) - { - publishAllSchema(); - } - if(bindingKey.startsWith("console.") || bindingKey.startsWith("*.") || bindingKey.startsWith("#.")) - { - publishAllConsole(); - } - - } - - void publishAllConsole() - { - QMFService qmfService = getQMFService(); - - long sampleTime = System.currentTimeMillis(); - - for(QMFPackage pkg : qmfService.getSupportedSchemas()) - { - for(QMFClass qmfClass : pkg.getClasses()) - { - Collection<QMFObject> qmfObjects = qmfService.getObjects(qmfClass); - - publishObjectsToConsole(sampleTime, qmfObjects); - } - - } - - } - - private QMFService getQMFService() - { - return _virtualHost.getApplicationRegistry().getQMFService(); - } - - void publishObjectsToConsole(final long sampleTime, - final Collection<QMFObject> qmfObjects) - { - if(!qmfObjects.isEmpty() && hasBindings()) - { - QMFClass qmfClass = qmfObjects.iterator().next().getQMFClass(); - ArrayList<QMFCommand> commands = new ArrayList<QMFCommand>(); - - - for(QMFObject obj : qmfObjects) - { - commands.add(obj.asConfigInfoCmd(sampleTime)); - commands.add(obj.asInstrumentInfoCmd(sampleTime)); - } - - publishToConsole(qmfClass, commands); - } - } - - private void publishToConsole(final QMFClass qmfClass, final ArrayList<QMFCommand> commands) - { - if(!commands.isEmpty() && hasBindings()) - { - String routingKey = "console.obj.1." + AGENT_BANK + "." + qmfClass.getPackage().getName() + "." + qmfClass.getName(); - QMFMessage message = new QMFMessage(routingKey,commands.toArray(new QMFCommand[commands.size()])); - - Collection<TopicMatcherResult> results = _parser.parse(new AMQShortString(routingKey)); - HashSet<AMQQueue> queues = new HashSet<AMQQueue>(); - for(TopicMatcherResult result : results) - { - TopicExchangeResult res = (TopicExchangeResult)result; - - for(Binding b : res.getBindings()) - { - b.incrementMatches(); - } - - queues.addAll(((TopicExchangeResult)result).getUnfilteredQueues()); - } - for(AMQQueue queue : queues) - { - try - { - queue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - } - } - - void publishAllSchema() - { - - } - - public synchronized void removeBinding(final Binding binding) - { - if(_bindingSet.remove(binding)) - { - AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey())); - TopicExchangeResult result = _topicExchangeResults.get(bindingKey); - result.removeBinding(binding); - result.removeUnfilteredQueue(binding.getQueue()); - } - - for(BindingListener listener : _listeners) - { - listener.bindingRemoved(this, binding); - } - } - - public synchronized Collection<Binding> getBindings() - { - return new ArrayList<Binding>(_bindingSet); - } - - public ArrayList<BaseQueue> route(InboundMessage message) - { - ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(1); - _msgReceived.incrementAndGet(); - _bytesReceived.addAndGet(message.getSize()); - queues.add(_mgmtQueue); - return queues; - } - - public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue) - { - return false; //TODO - } - - public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isBound(AMQShortString routingKey, AMQQueue queue) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isBound(AMQShortString routingKey) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isBound(AMQQueue queue) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean hasBindings() - { - return !_bindingSet.isEmpty(); - } - - public boolean isBound(String bindingKey, AMQQueue queue) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isBound(String bindingKey) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void addCloseTask(final Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void removeCloseTask(final Task task) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - - - public Exchange getAlternateExchange() - { - return null; - } - - public Map<String, Object> getArguments() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void setAlternateExchange(Exchange exchange) - { - - } - - public void removeReference(ExchangeReferrer exchange) - { - } - - public void addReference(ExchangeReferrer exchange) - { - } - - public boolean hasReferrers() - { - return true; - } - - - - private class UpdateTask extends HouseKeepingTask - { - public UpdateTask(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - publishAllConsole(); - publishAllSchema(); - } - - } - - public void objectCreated(final QMFObject obj) - { - publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj)); - } - - public void objectDeleted(final QMFObject obj) - { - publishObjectsToConsole(System.currentTimeMillis(), Collections.singleton(obj)); - } - - public long getBindingCount() - { - return getBindings().size(); - } - - public long getBindingCountHigh() - { - return _bindingCountHigh; - } - - public long getMsgReceives() - { - return _msgReceived.get(); - } - - public long getMsgRoutes() - { - return getMsgReceives(); - } - - public long getMsgDrops() - { - return 0l; - } - - public long getByteReceives() - { - return _bytesReceived.get(); - } - - public long getByteRoutes() - { - return getByteReceives(); - } - - public long getByteDrops() - { - return 0l; - } - - public long getCreateTime() - { - return _createTime; - } - - public void addBindingListener(final BindingListener listener) - { - _listeners.add(listener); - } - - public void removeBindingListener(final BindingListener listener) - { - _listeners.remove(listener); - } - -} |