summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-08 13:15:35 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-08 13:15:35 +0000
commit8468c9b9b067489aef2227456f662bfdbdc71272 (patch)
tree370659fd9ea9784d6e608f049a870433aa821353 /qpid/java/broker-plugins
parente7375322dc1083dbfffe49a903d4737a6943907e (diff)
downloadqpid-python-8468c9b9b067489aef2227456f662bfdbdc71272.tar.gz
QPID-3978 : [Java Broker] Allow for acquired messages to be removed from a queue due to TTL or management actions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616742 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java18
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java8
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java6
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java35
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java15
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java28
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java18
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java20
11 files changed, 122 insertions, 57 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index d73d019000..7ab3fbb1f5 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -534,15 +534,25 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _stopped.get();
}
- public void acknowledge(MessageInstance entry)
+ public boolean deleteAcquired(MessageInstance entry)
{
- // TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(getConsumer()))
{
- _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
- _unacknowledgedCount.decrementAndGet();
+ acquisitionRemoved(entry);
entry.delete();
+ return true;
}
+ else
+ {
+ return false;
+ }
+ }
+
+ @Override
+ public void acquisitionRemoved(final MessageInstance entry)
+ {
+ _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
+ _unacknowledgedCount.decrementAndGet();
}
public void flush()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
index 4420709a91..94f04bbae3 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
@@ -41,13 +41,13 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()) && _entry.lockAcquisition())
{
_target.getSessionModel().acknowledge(_target, _entry);
}
else
{
- _logger.warn("MessageAccept received for message which has not been acquired (likely client error)");
+ _logger.info("MessageAccept received for message which is not been acquired - message may have expired or been removed");
}
}
@@ -60,7 +60,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageRelease received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageRelease received for message which has not been acquired - message may have expired or been removed");
}
}
@@ -72,7 +72,7 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
else
{
- _logger.warn("MessageReject received for message which has not been acquired (likely client error)");
+ _logger.warn("MessageReject received for message which has not been acquired - message may have expired or been removed");
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
index cd1146ac0b..7917b7989a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
@@ -29,6 +29,7 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
+ private long _messageSize;
private boolean _restoreCredit;
public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
@@ -38,15 +39,19 @@ public class MessageAcceptCompletionListener implements Method.CompletionListene
_entry = entry;
_session = session;
_restoreCredit = restoreCredit;
+ if(restoreCredit)
+ {
+ _messageSize = entry.getMessage().getSize();
+ }
}
public void onComplete(Method method)
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry.getMessage());
+ _sub.getCreditManager().restoreCredit(1l, _messageSize);
}
- if(_entry.isAcquiredBy(_sub.getConsumer()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()) && _entry.lockAcquisition())
{
_session.acknowledge(_sub, _entry);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 3fe1515b18..b1c22fe823 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -460,7 +460,7 @@ public class ServerSession extends Session
public void postCommit()
{
- sub.acknowledge(entry);
+ sub.deleteAcquired(entry);
}
public void onRollback()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b6e1b7dd6a..7877812d84 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1413,7 +1413,11 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
// explicit rollbacks resend the message after the rollback-ok is sent
if(_rollingBack)
{
- _resendList.addAll(_ackedMessages);
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.unlockAcquisition();
+ }
+ _resendList.addAll(_ackedMessages);
}
else
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 3de89a1d70..7303ef30da 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -20,11 +20,16 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
@@ -34,14 +39,10 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
* that was given out by the broker and the channel id. <p/>
@@ -57,7 +58,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
final MessageInstance.State oldSate,
final MessageInstance.State newState)
{
- if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
+ if (oldSate == QueueEntry.State.ACQUIRED && newState != QueueEntry.State.ACQUIRED)
{
restoreCredit(entry.getMessage());
}
@@ -74,8 +75,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
- AMQShortString consumerTag, FieldTable filters,
- FlowCreditManager creditManager) throws AMQException
+ AMQShortString consumerTag, FieldTable filters,
+ FlowCreditManager creditManager) throws AMQException
{
return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
}
@@ -557,6 +558,11 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
});
}
+ @Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
public long getUnacknowledgedBytes()
{
return _unacknowledgedBytes.longValue();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 8d70e769d3..1bd9ab079e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -20,31 +20,28 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
private final Object _lock = new Object();
- private long _unackedSize;
-
private Map<Long, MessageInstance> _map;
- private long _lastDeliveryTag;
-
private final int _prefetchLimit;
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
+ _map = new LinkedHashMap<>(prefetchLimit);
}
public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
@@ -81,12 +78,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
MessageInstance message = _map.remove(deliveryTag);
- if(message != null)
- {
- _unackedSize -= message.getMessage().getSize();
-
- }
-
return message;
}
}
@@ -109,8 +100,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.put(deliveryTag, message);
- _unackedSize += message.getMessage().getSize();
- _lastDeliveryTag = deliveryTag;
}
}
@@ -119,8 +108,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
Collection<MessageInstance> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
- _unackedSize = 0l;
+ _map = new LinkedHashMap<>(_prefetchLimit);
return currentEntries;
}
}
@@ -138,7 +126,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
synchronized (_lock)
{
_map.clear();
- _unackedSize = 0l;
}
}
@@ -163,6 +150,14 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
+ List<MessageInstance> acknowledged = new ArrayList<>();
+ for(MessageInstance instance : ackedMessageMap.values())
+ {
+ if(instance.lockAcquisition())
+ {
+ acknowledged.add(instance);
+ }
+ }
return ackedMessageMap.values();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index adb2f8ea6a..bceae85896 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
@@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
if(outcome instanceof Accepted)
{
+ _queueEntry.lockAcquisition();
txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
@@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
+ _queueEntry.unlockAcquisition();
}
}
});
@@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
@Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
+ @Override
public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 7a844cbc79..a8fc5387b4 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -636,19 +636,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
if(_consumer.acquires())
{
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.delete();
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ if(queueEntry.acquire() || queueEntry.isAcquired())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.delete();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
}
}
else if(outcome instanceof Released)
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index e73d177599..34f08615ad 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1071,6 +1071,12 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return false;
+ }
+
+ @Override
public void setRedelivered()
{
@@ -1119,6 +1125,18 @@ class ManagementNode implements MessageSource, MessageDestination
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index ae2828d392..03e7eab61b 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -90,6 +90,12 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+ {
+ return consumer == _consumer;
+ }
+
+ @Override
public void setRedelivered()
{
_isRedelivered = true;
@@ -138,6 +144,18 @@ class ManagementResponse implements MessageInstance
}
@Override
+ public boolean lockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean unlockAcquisition()
+ {
+ return false;
+ }
+
+ @Override
public int getMaximumDeliveryCount()
{
return 0;
@@ -190,7 +208,7 @@ class ManagementResponse implements MessageInstance
@Override
public void delete()
{
- // TODO
+ _isDeleted = true;
}
@Override