diff options
Diffstat (limited to 'vendor/amqplib/client_0_8/channel.py')
-rw-r--r-- | vendor/amqplib/client_0_8/channel.py | 2602 |
1 files changed, 2602 insertions, 0 deletions
diff --git a/vendor/amqplib/client_0_8/channel.py b/vendor/amqplib/client_0_8/channel.py new file mode 100644 index 0000000000..8de0b220ac --- /dev/null +++ b/vendor/amqplib/client_0_8/channel.py @@ -0,0 +1,2602 @@ +""" +AMQP 0-8 Channels + +""" +# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 + +import logging +from Queue import Queue + +from abstract_channel import AbstractChannel +from exceptions import * +from serialization import AMQPWriter + +__all__ = [ + 'Channel', # here mainly so it shows in in pydoc + ] + +AMQP_LOGGER = logging.getLogger('amqplib') + + +class Channel(AbstractChannel): + """ + work with channels + + The channel class provides methods for a client to establish a + virtual connection - a channel - to a server and for both peers to + operate the virtual connection thereafter. + + GRAMMAR: + + channel = open-channel *use-channel close-channel + open-channel = C:OPEN S:OPEN-OK + use-channel = C:FLOW S:FLOW-OK + / S:FLOW C:FLOW-OK + / S:ALERT + / functional-class + close-channel = C:CLOSE S:CLOSE-OK + / S:CLOSE C:CLOSE-OK + + """ + def __init__(self, connection, channel_id=None, auto_decode=True): + """ + Create a channel bound to a connection and using the specified + numeric channel_id, and open on the server. + + The 'auto_decode' parameter (defaults to True), indicates + whether the library should attempt to decode the body + of Messages to a Unicode string if there's a 'content_encoding' + property for the message. If there's no 'content_encoding' + property, or the decode raises an Exception, the plain string + is left as the message body. + + """ + if channel_id is None: + channel_id = connection._get_free_channel_id() + AMQP_LOGGER.debug('using channel_id: %d' % channel_id) + + super(Channel, self).__init__(connection, channel_id) + + self.default_ticket = 0 + self.is_open = False + self.active = True # Flow control + self.alerts = Queue() + self.returned_messages = Queue() + self.callbacks = {} + self.auto_decode = auto_decode + + self._x_open() + + + def _do_close(self): + """ + Tear down this object, after we've agreed to close with the server. + + """ + AMQP_LOGGER.debug('Closed channel #%d' % self.channel_id) + self.is_open = False + del self.connection.channels[self.channel_id] + self.channel_id = self.connection = None + self.callbacks = {} + + + ################# + + def _alert(self, args): + """ + This method allows the server to send a non-fatal warning to + the client. This is used for methods that are normally + asynchronous and thus do not have confirmations, and for which + the server may detect errors that need to be reported. Fatal + errors are handled as channel or connection exceptions; non- + fatal errors are sent through this method. + + PARAMETERS: + reply_code: short + + The reply code. The AMQ reply codes are defined in AMQ + RFC 011. + + reply_text: shortstr + + The localised reply text. This text can be logged as an + aid to resolving issues. + + details: table + + detailed information for warning + + A set of fields that provide more information about + the problem. The meaning of these fields are defined + on a per-reply-code basis (TO BE DEFINED). + + """ + reply_code = args.read_short() + reply_text = args.read_shortstr() + details = args.read_table() + + self.alerts.put((reply_code, reply_text, details)) + + + def close(self, reply_code=0, reply_text='', method_sig=(0, 0)): + """ + request a channel close + + This method indicates that the sender wants to close the + channel. This may be due to internal conditions (e.g. a forced + shut-down) or due to an error handling a specific method, i.e. + an exception. When a close is due to an exception, the sender + provides the class and method id of the method which caused + the exception. + + RULE: + + After sending this method any received method except + Channel.Close-OK MUST be discarded. + + RULE: + + The peer sending this method MAY use a counter or timeout + to detect failure of the other peer to respond correctly + with Channel.Close-OK.. + + PARAMETERS: + reply_code: short + + The reply code. The AMQ reply codes are defined in AMQ + RFC 011. + + reply_text: shortstr + + The localised reply text. This text can be logged as an + aid to resolving issues. + + class_id: short + + failing method class + + When the close is provoked by a method exception, this + is the class of the method. + + method_id: short + + failing method ID + + When the close is provoked by a method exception, this + is the ID of the method. + + """ + if not self.is_open: + # already closed + return + + args = AMQPWriter() + args.write_short(reply_code) + args.write_shortstr(reply_text) + args.write_short(method_sig[0]) # class_id + args.write_short(method_sig[1]) # method_id + self._send_method((20, 40), args) + return self.wait(allowed_methods=[ + (20, 41), # Channel.close_ok + ]) + + + def _close(self, args): + """ + request a channel close + + This method indicates that the sender wants to close the + channel. This may be due to internal conditions (e.g. a forced + shut-down) or due to an error handling a specific method, i.e. + an exception. When a close is due to an exception, the sender + provides the class and method id of the method which caused + the exception. + + RULE: + + After sending this method any received method except + Channel.Close-OK MUST be discarded. + + RULE: + + The peer sending this method MAY use a counter or timeout + to detect failure of the other peer to respond correctly + with Channel.Close-OK.. + + PARAMETERS: + reply_code: short + + The reply code. The AMQ reply codes are defined in AMQ + RFC 011. + + reply_text: shortstr + + The localised reply text. This text can be logged as an + aid to resolving issues. + + class_id: short + + failing method class + + When the close is provoked by a method exception, this + is the class of the method. + + method_id: short + + failing method ID + + When the close is provoked by a method exception, this + is the ID of the method. + + """ + reply_code = args.read_short() + reply_text = args.read_shortstr() + class_id = args.read_short() + method_id = args.read_short() + +# self.close_ok() + + +# def close_ok(self): +# """ +# confirm a channel close +# +# This method confirms a Channel.Close method and tells the +# recipient that it is safe to release resources for the channel +# and close the socket. +# +# RULE: +# +# A peer that detects a socket closure without having +# received a Channel.Close-Ok handshake method SHOULD log +# the error. +# +# """ + self._send_method((20, 41)) + self._do_close() + + raise AMQPChannelException(reply_code, reply_text, + (class_id, method_id)) + + + def _close_ok(self, args): + """ + confirm a channel close + + This method confirms a Channel.Close method and tells the + recipient that it is safe to release resources for the channel + and close the socket. + + RULE: + + A peer that detects a socket closure without having + received a Channel.Close-Ok handshake method SHOULD log + the error. + + """ + self._do_close() + + + def flow(self, active): + """ + enable/disable flow from peer + + This method asks the peer to pause or restart the flow of + content data. This is a simple flow-control mechanism that a + peer can use to avoid oveflowing its queues or otherwise + finding itself receiving more messages than it can process. + Note that this method is not intended for window control. The + peer that receives a request to stop sending content should + finish sending the current content, if any, and then wait + until it receives a Flow restart method. + + RULE: + + When a new channel is opened, it is active. Some + applications assume that channels are inactive until + started. To emulate this behaviour a client MAY open the + channel, then pause it. + + RULE: + + When sending content data in multiple frames, a peer + SHOULD monitor the channel for incoming methods and + respond to a Channel.Flow as rapidly as possible. + + RULE: + + A peer MAY use the Channel.Flow method to throttle + incoming content data for internal reasons, for example, + when exchangeing data over a slower connection. + + RULE: + + The peer that requests a Channel.Flow method MAY + disconnect and/or ban a peer that does not respect the + request. + + PARAMETERS: + active: boolean + + start/stop content frames + + If True, the peer starts sending content frames. If + False, the peer stops sending content frames. + + """ + args = AMQPWriter() + args.write_bit(active) + self._send_method((20, 20), args) + return self.wait(allowed_methods=[ + (20, 21), # Channel.flow_ok + ]) + + + def _flow(self, args): + """ + enable/disable flow from peer + + This method asks the peer to pause or restart the flow of + content data. This is a simple flow-control mechanism that a + peer can use to avoid oveflowing its queues or otherwise + finding itself receiving more messages than it can process. + Note that this method is not intended for window control. The + peer that receives a request to stop sending content should + finish sending the current content, if any, and then wait + until it receives a Flow restart method. + + RULE: + + When a new channel is opened, it is active. Some + applications assume that channels are inactive until + started. To emulate this behaviour a client MAY open the + channel, then pause it. + + RULE: + + When sending content data in multiple frames, a peer + SHOULD monitor the channel for incoming methods and + respond to a Channel.Flow as rapidly as possible. + + RULE: + + A peer MAY use the Channel.Flow method to throttle + incoming content data for internal reasons, for example, + when exchangeing data over a slower connection. + + RULE: + + The peer that requests a Channel.Flow method MAY + disconnect and/or ban a peer that does not respect the + request. + + PARAMETERS: + active: boolean + + start/stop content frames + + If True, the peer starts sending content frames. If + False, the peer stops sending content frames. + + """ + self.active = args.read_bit() + + self._x_flow_ok(self.active) + + + def _x_flow_ok(self, active): + """ + confirm a flow method + + Confirms to the peer that a flow command was received and + processed. + + PARAMETERS: + active: boolean + + current flow setting + + Confirms the setting of the processed flow method: + True means the peer will start sending or continue + to send content frames; False means it will not. + + """ + args = AMQPWriter() + args.write_bit(active) + self._send_method((20, 21), args) + + + def _flow_ok(self, args): + """ + confirm a flow method + + Confirms to the peer that a flow command was received and + processed. + + PARAMETERS: + active: boolean + + current flow setting + + Confirms the setting of the processed flow method: + True means the peer will start sending or continue + to send content frames; False means it will not. + + """ + return args.read_bit() + + + def _x_open(self, out_of_band=''): + """ + open a channel for use + + This method opens a virtual connection (a channel). + + RULE: + + This method MUST NOT be called when the channel is already + open. + + PARAMETERS: + out_of_band: shortstr + + out-of-band settings + + Configures out-of-band transfers on this channel. The + syntax and meaning of this field will be formally + defined at a later date. + + """ + if self.is_open: + return + + args = AMQPWriter() + args.write_shortstr(out_of_band) + self._send_method((20, 10), args) + return self.wait(allowed_methods=[ + (20, 11), # Channel.open_ok + ]) + + + def _open_ok(self, args): + """ + signal that the channel is ready + + This method signals to the client that the channel is ready + for use. + + """ + self.is_open = True + AMQP_LOGGER.debug('Channel open') + + + ############# + # + # Access + # + # + # work with access tickets + # + # The protocol control access to server resources using access + # tickets. A client must explicitly request access tickets before + # doing work. An access ticket grants a client the right to use a + # specific set of resources - called a "realm" - in specific ways. + # + # GRAMMAR: + # + # access = C:REQUEST S:REQUEST-OK + # + # + + def access_request(self, realm, exclusive=False, + passive=False, active=False, write=False, read=False): + """ + request an access ticket + + This method requests an access ticket for an access realm. The + server responds by granting the access ticket. If the client + does not have access rights to the requested realm this causes + a connection exception. Access tickets are a per-channel + resource. + + RULE: + + The realm name MUST start with either "/data" (for + application resources) or "/admin" (for server + administration resources). If the realm starts with any + other path, the server MUST raise a connection exception + with reply code 403 (access refused). + + RULE: + + The server MUST implement the /data realm and MAY + implement the /admin realm. The mapping of resources to + realms is not defined in the protocol - this is a server- + side configuration issue. + + PARAMETERS: + realm: shortstr + + name of requested realm + + RULE: + + If the specified realm is not known to the server, + the server must raise a channel exception with + reply code 402 (invalid path). + + exclusive: boolean + + request exclusive access + + Request exclusive access to the realm. If the server + cannot grant this - because there are other active + tickets for the realm - it raises a channel exception. + + passive: boolean + + request passive access + + Request message passive access to the specified access + realm. Passive access lets a client get information + about resources in the realm but not to make any + changes to them. + + active: boolean + + request active access + + Request message active access to the specified access + realm. Acvtive access lets a client get create and + delete resources in the realm. + + write: boolean + + request write access + + Request write access to the specified access realm. + Write access lets a client publish messages to all + exchanges in the realm. + + read: boolean + + request read access + + Request read access to the specified access realm. + Read access lets a client consume messages from queues + in the realm. + + The most recently requested ticket is used as the channel's + default ticket for any method that requires a ticket. + + """ + args = AMQPWriter() + args.write_shortstr(realm) + args.write_bit(exclusive) + args.write_bit(passive) + args.write_bit(active) + args.write_bit(write) + args.write_bit(read) + self._send_method((30, 10), args) + return self.wait(allowed_methods=[ + (30, 11), # Channel.access_request_ok + ]) + + + def _access_request_ok(self, args): + """ + grant access to server resources + + This method provides the client with an access ticket. The + access ticket is valid within the current channel and for the + lifespan of the channel. + + RULE: + + The client MUST NOT use access tickets except within the + same channel as originally granted. + + RULE: + + The server MUST isolate access tickets per channel and + treat an attempt by a client to mix these as a connection + exception. + + PARAMETERS: + ticket: short + + """ + self.default_ticket = args.read_short() + return self.default_ticket + + + ############# + # + # Exchange + # + # + # work with exchanges + # + # Exchanges match and distribute messages across queues. + # Exchanges can be configured in the server or created at runtime. + # + # GRAMMAR: + # + # exchange = C:DECLARE S:DECLARE-OK + # / C:DELETE S:DELETE-OK + # + # RULE: + # + # The server MUST implement the direct and fanout exchange + # types, and predeclare the corresponding exchanges named + # amq.direct and amq.fanout in each virtual host. The server + # MUST also predeclare a direct exchange to act as the default + # exchange for content Publish methods and for default queue + # bindings. + # + # RULE: + # + # The server SHOULD implement the topic exchange type, and + # predeclare the corresponding exchange named amq.topic in + # each virtual host. + # + # RULE: + # + # The server MAY implement the system exchange type, and + # predeclare the corresponding exchanges named amq.system in + # each virtual host. If the client attempts to bind a queue to + # the system exchange, the server MUST raise a connection + # exception with reply code 507 (not allowed). + # + # RULE: + # + # The default exchange MUST be defined as internal, and be + # inaccessible to the client except by specifying an empty + # exchange name in a content Publish method. That is, the + # server MUST NOT let clients make explicit bindings to this + # exchange. + # + # + + def exchange_declare(self, exchange, type, passive=False, durable=False, + auto_delete=True, internal=False, nowait=False, + arguments=None, ticket=None): + """ + declare exchange, create if needed + + This method creates an exchange if it does not already exist, + and if the exchange exists, verifies that it is of the correct + and expected class. + + RULE: + + The server SHOULD support a minimum of 16 exchanges per + virtual host and ideally, impose no limit except as + defined by available resources. + + PARAMETERS: + exchange: shortstr + + RULE: + + Exchange names starting with "amq." are reserved + for predeclared and standardised exchanges. If + the client attempts to create an exchange starting + with "amq.", the server MUST raise a channel + exception with reply code 403 (access refused). + + type: shortstr + + exchange type + + Each exchange belongs to one of a set of exchange + types implemented by the server. The exchange types + define the functionality of the exchange - i.e. how + messages are routed through it. It is not valid or + meaningful to attempt to change the type of an + existing exchange. + + RULE: + + If the exchange already exists with a different + type, the server MUST raise a connection exception + with a reply code 507 (not allowed). + + RULE: + + If the server does not support the requested + exchange type it MUST raise a connection exception + with a reply code 503 (command invalid). + + passive: boolean + + do not create exchange + + If set, the server will not create the exchange. The + client can use this to check whether an exchange + exists without modifying the server state. + + RULE: + + If set, and the exchange does not already exist, + the server MUST raise a channel exception with + reply code 404 (not found). + + durable: boolean + + request a durable exchange + + If set when creating a new exchange, the exchange will + be marked as durable. Durable exchanges remain active + when a server restarts. Non-durable exchanges + (transient exchanges) are purged if/when a server + restarts. + + RULE: + + The server MUST support both durable and transient + exchanges. + + RULE: + + The server MUST ignore the durable field if the + exchange already exists. + + auto_delete: boolean + + auto-delete when unused + + If set, the exchange is deleted when all queues have + finished using it. + + RULE: + + The server SHOULD allow for a reasonable delay + between the point when it determines that an + exchange is not being used (or no longer used), + and the point when it deletes the exchange. At + the least it must allow a client to create an + exchange and then bind a queue to it, with a small + but non-zero delay between these two actions. + + RULE: + + The server MUST ignore the auto-delete field if + the exchange already exists. + + internal: boolean + + create internal exchange + + If set, the exchange may not be used directly by + publishers, but only when bound to other exchanges. + Internal exchanges are used to construct wiring that + is not visible to applications. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + arguments: table + + arguments for declaration + + A set of arguments for the declaration. The syntax and + semantics of these arguments depends on the server + implementation. This field is ignored if passive is + True. + + ticket: short + + When a client defines a new exchange, this belongs to + the access realm of the ticket used. All further work + done with that exchange must be done with an access + ticket for the same realm. + + RULE: + + The client MUST provide a valid access ticket + giving "active" access to the realm in which the + exchange exists or will be created, or "passive" + access if the if-exists flag is set. + + """ + if arguments is None: + arguments = {} + + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(exchange) + args.write_shortstr(type) + args.write_bit(passive) + args.write_bit(durable) + args.write_bit(auto_delete) + args.write_bit(internal) + args.write_bit(nowait) + args.write_table(arguments) + self._send_method((40, 10), args) + + if not nowait: + return self.wait(allowed_methods=[ + (40, 11), # Channel.exchange_declare_ok + ]) + + + def _exchange_declare_ok(self, args): + """ + confirms an exchange declaration + + This method confirms a Declare method and confirms the name of + the exchange, essential for automatically-named exchanges. + + """ + pass + + + def exchange_delete(self, exchange, if_unused=False, + nowait=False, ticket=None): + """ + delete an exchange + + This method deletes an exchange. When an exchange is deleted + all queue bindings on the exchange are cancelled. + + PARAMETERS: + exchange: shortstr + + RULE: + + The exchange MUST exist. Attempting to delete a + non-existing exchange causes a channel exception. + + if_unused: boolean + + delete only if unused + + If set, the server will only delete the exchange if it + has no queue bindings. If the exchange has queue + bindings the server does not delete it but raises a + channel exception instead. + + RULE: + + If set, the server SHOULD delete the exchange but + only if it has no queue bindings. + + RULE: + + If set, the server SHOULD raise a channel + exception if the exchange is in use. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + ticket: short + + RULE: + + The client MUST provide a valid access ticket + giving "active" access rights to the exchange's + access realm. + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(exchange) + args.write_bit(if_unused) + args.write_bit(nowait) + self._send_method((40, 20), args) + + if not nowait: + return self.wait(allowed_methods=[ + (40, 21), # Channel.exchange_delete_ok + ]) + + + def _exchange_delete_ok(self, args): + """ + confirm deletion of an exchange + + This method confirms the deletion of an exchange. + + """ + pass + + + ############# + # + # Queue + # + # + # work with queues + # + # Queues store and forward messages. Queues can be configured in + # the server or created at runtime. Queues must be attached to at + # least one exchange in order to receive messages from publishers. + # + # GRAMMAR: + # + # queue = C:DECLARE S:DECLARE-OK + # / C:BIND S:BIND-OK + # / C:PURGE S:PURGE-OK + # / C:DELETE S:DELETE-OK + # + # RULE: + # + # A server MUST allow any content class to be sent to any + # queue, in any mix, and queue and delivery these content + # classes independently. Note that all methods that fetch + # content off queues are specific to a given content class. + # + # + + def queue_bind(self, queue, exchange, routing_key='', + nowait=False, arguments=None, ticket=None): + """ + bind queue to an exchange + + This method binds a queue to an exchange. Until a queue is + bound it will not receive any messages. In a classic + messaging model, store-and-forward queues are bound to a dest + exchange and subscription queues are bound to a dest_wild + exchange. + + RULE: + + A server MUST allow ignore duplicate bindings - that is, + two or more bind methods for a specific queue, with + identical arguments - without treating these as an error. + + RULE: + + If a bind fails, the server MUST raise a connection + exception. + + RULE: + + The server MUST NOT allow a durable queue to bind to a + transient exchange. If the client attempts this the server + MUST raise a channel exception. + + RULE: + + Bindings for durable queues are automatically durable and + the server SHOULD restore such bindings after a server + restart. + + RULE: + + If the client attempts to an exchange that was declared as + internal, the server MUST raise a connection exception + with reply code 530 (not allowed). + + RULE: + + The server SHOULD support at least 4 bindings per queue, + and ideally, impose no limit except as defined by + available resources. + + PARAMETERS: + queue: shortstr + + Specifies the name of the queue to bind. If the queue + name is empty, refers to the current queue for the + channel, which is the last declared queue. + + RULE: + + If the client did not previously declare a queue, + and the queue name in this method is empty, the + server MUST raise a connection exception with + reply code 530 (not allowed). + + RULE: + + If the queue does not exist the server MUST raise + a channel exception with reply code 404 (not + found). + + exchange: shortstr + + The name of the exchange to bind to. + + RULE: + + If the exchange does not exist the server MUST + raise a channel exception with reply code 404 (not + found). + + routing_key: shortstr + + message routing key + + Specifies the routing key for the binding. The + routing key is used for routing messages depending on + the exchange configuration. Not all exchanges use a + routing key - refer to the specific exchange + documentation. If the routing key is empty and the + queue name is empty, the routing key will be the + current queue for the channel, which is the last + declared queue. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + arguments: table + + arguments for binding + + A set of arguments for the binding. The syntax and + semantics of these arguments depends on the exchange + class. + + ticket: short + + The client provides a valid access ticket giving + "active" access rights to the queue's access realm. + + """ + if arguments is None: + arguments = {} + + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(queue) + args.write_shortstr(exchange) + args.write_shortstr(routing_key) + args.write_bit(nowait) + args.write_table(arguments) + self._send_method((50, 20), args) + + if not nowait: + return self.wait(allowed_methods=[ + (50, 21), # Channel.queue_bind_ok + ]) + + + def _queue_bind_ok(self, args): + """ + confirm bind successful + + This method confirms that the bind was successful. + + """ + pass + + + def queue_declare(self, queue='', passive=False, durable=False, + exclusive=False, auto_delete=True, nowait=False, + arguments=None, ticket=None): + """ + declare queue, create if needed + + This method creates or checks a queue. When creating a new + queue the client can specify various properties that control + the durability of the queue and its contents, and the level of + sharing for the queue. + + RULE: + + The server MUST create a default binding for a newly- + created queue to the default exchange, which is an + exchange of type 'direct'. + + RULE: + + The server SHOULD support a minimum of 256 queues per + virtual host and ideally, impose no limit except as + defined by available resources. + + PARAMETERS: + queue: shortstr + + RULE: + + The queue name MAY be empty, in which case the + server MUST create a new queue with a unique + generated name and return this to the client in + the Declare-Ok method. + + RULE: + + Queue names starting with "amq." are reserved for + predeclared and standardised server queues. If + the queue name starts with "amq." and the passive + option is False, the server MUST raise a connection + exception with reply code 403 (access refused). + + passive: boolean + + do not create queue + + If set, the server will not create the queue. The + client can use this to check whether a queue exists + without modifying the server state. + + RULE: + + If set, and the queue does not already exist, the + server MUST respond with a reply code 404 (not + found) and raise a channel exception. + + durable: boolean + + request a durable queue + + If set when creating a new queue, the queue will be + marked as durable. Durable queues remain active when + a server restarts. Non-durable queues (transient + queues) are purged if/when a server restarts. Note + that durable queues do not necessarily hold persistent + messages, although it does not make sense to send + persistent messages to a transient queue. + + RULE: + + The server MUST recreate the durable queue after a + restart. + + RULE: + + The server MUST support both durable and transient + queues. + + RULE: + + The server MUST ignore the durable field if the + queue already exists. + + exclusive: boolean + + request an exclusive queue + + Exclusive queues may only be consumed from by the + current connection. Setting the 'exclusive' flag + always implies 'auto-delete'. + + RULE: + + The server MUST support both exclusive (private) + and non-exclusive (shared) queues. + + RULE: + + The server MUST raise a channel exception if + 'exclusive' is specified and the queue already + exists and is owned by a different connection. + + auto_delete: boolean + + auto-delete queue when unused + + If set, the queue is deleted when all consumers have + finished using it. Last consumer can be cancelled + either explicitly or because its channel is closed. If + there was no consumer ever on the queue, it won't be + deleted. + + RULE: + + The server SHOULD allow for a reasonable delay + between the point when it determines that a queue + is not being used (or no longer used), and the + point when it deletes the queue. At the least it + must allow a client to create a queue and then + create a consumer to read from it, with a small + but non-zero delay between these two actions. The + server should equally allow for clients that may + be disconnected prematurely, and wish to re- + consume from the same queue without losing + messages. We would recommend a configurable + timeout, with a suitable default value being one + minute. + + RULE: + + The server MUST ignore the auto-delete field if + the queue already exists. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + arguments: table + + arguments for declaration + + A set of arguments for the declaration. The syntax and + semantics of these arguments depends on the server + implementation. This field is ignored if passive is + True. + + ticket: short + + When a client defines a new queue, this belongs to the + access realm of the ticket used. All further work + done with that queue must be done with an access + ticket for the same realm. + + The client provides a valid access ticket giving + "active" access to the realm in which the queue exists + or will be created, or "passive" access if the if- + exists flag is set. + + Returns a tuple containing 3 items: + the name of the queue (essential for automatically-named queues) + message count + consumer count + + """ + if arguments is None: + arguments = {} + + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(queue) + args.write_bit(passive) + args.write_bit(durable) + args.write_bit(exclusive) + args.write_bit(auto_delete) + args.write_bit(nowait) + args.write_table(arguments) + self._send_method((50, 10), args) + + if not nowait: + return self.wait(allowed_methods=[ + (50, 11), # Channel.queue_declare_ok + ]) + + + def _queue_declare_ok(self, args): + """ + confirms a queue definition + + This method confirms a Declare method and confirms the name of + the queue, essential for automatically-named queues. + + PARAMETERS: + queue: shortstr + + Reports the name of the queue. If the server generated + a queue name, this field contains that name. + + message_count: long + + number of messages in queue + + Reports the number of messages in the queue, which + will be zero for newly-created queues. + + consumer_count: long + + number of consumers + + Reports the number of active consumers for the queue. + Note that consumers can suspend activity + (Channel.Flow) in which case they do not appear in + this count. + + """ + queue = args.read_shortstr() + message_count = args.read_long() + consumer_count = args.read_long() + + return queue, message_count, consumer_count + + + def queue_delete(self, queue='', if_unused=False, if_empty=False, + nowait=False, ticket=None): + """ + delete a queue + + This method deletes a queue. When a queue is deleted any + pending messages are sent to a dead-letter queue if this is + defined in the server configuration, and all consumers on the + queue are cancelled. + + RULE: + + The server SHOULD use a dead-letter queue to hold messages + that were pending on a deleted queue, and MAY provide + facilities for a system administrator to move these + messages back to an active queue. + + PARAMETERS: + queue: shortstr + + Specifies the name of the queue to delete. If the + queue name is empty, refers to the current queue for + the channel, which is the last declared queue. + + RULE: + + If the client did not previously declare a queue, + and the queue name in this method is empty, the + server MUST raise a connection exception with + reply code 530 (not allowed). + + RULE: + + The queue must exist. Attempting to delete a non- + existing queue causes a channel exception. + + if_unused: boolean + + delete only if unused + + If set, the server will only delete the queue if it + has no consumers. If the queue has consumers the + server does does not delete it but raises a channel + exception instead. + + RULE: + + The server MUST respect the if-unused flag when + deleting a queue. + + if_empty: boolean + + delete only if empty + + If set, the server will only delete the queue if it + has no messages. If the queue is not empty the server + raises a channel exception. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + ticket: short + + The client provides a valid access ticket giving + "active" access rights to the queue's access realm. + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + + args.write_shortstr(queue) + args.write_bit(if_unused) + args.write_bit(if_empty) + args.write_bit(nowait) + self._send_method((50, 40), args) + + if not nowait: + return self.wait(allowed_methods=[ + (50, 41), # Channel.queue_delete_ok + ]) + + + def _queue_delete_ok(self, args): + """ + confirm deletion of a queue + + This method confirms the deletion of a queue. + + PARAMETERS: + message_count: long + + number of messages purged + + Reports the number of messages purged. + + """ + return args.read_long() + + + def queue_purge(self, queue='', nowait=False, ticket=None): + """ + purge a queue + + This method removes all messages from a queue. It does not + cancel consumers. Purged messages are deleted without any + formal "undo" mechanism. + + RULE: + + A call to purge MUST result in an empty queue. + + RULE: + + On transacted channels the server MUST not purge messages + that have already been sent to a client but not yet + acknowledged. + + RULE: + + The server MAY implement a purge queue or log that allows + system administrators to recover accidentally-purged + messages. The server SHOULD NOT keep purged messages in + the same storage spaces as the live messages since the + volumes of purged messages may get very large. + + PARAMETERS: + queue: shortstr + + Specifies the name of the queue to purge. If the + queue name is empty, refers to the current queue for + the channel, which is the last declared queue. + + RULE: + + If the client did not previously declare a queue, + and the queue name in this method is empty, the + server MUST raise a connection exception with + reply code 530 (not allowed). + + RULE: + + The queue must exist. Attempting to purge a non- + existing queue causes a channel exception. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + ticket: short + + The access ticket must be for the access realm that + holds the queue. + + RULE: + + The client MUST provide a valid access ticket + giving "read" access rights to the queue's access + realm. Note that purging a queue is equivalent to + reading all messages and discarding them. + + if nowait is False, returns a message_count + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(queue) + args.write_bit(nowait) + self._send_method((50, 30), args) + + if not nowait: + return self.wait(allowed_methods=[ + (50, 31), # Channel.queue_purge_ok + ]) + + + def _queue_purge_ok(self, args): + """ + confirms a queue purge + + This method confirms the purge of a queue. + + PARAMETERS: + message_count: long + + number of messages purged + + Reports the number of messages purged. + + """ + return args.read_long() + + + ############# + # + # Basic + # + # + # work with basic content + # + # The Basic class provides methods that support an industry- + # standard messaging model. + # + # GRAMMAR: + # + # basic = C:QOS S:QOS-OK + # / C:CONSUME S:CONSUME-OK + # / C:CANCEL S:CANCEL-OK + # / C:PUBLISH content + # / S:RETURN content + # / S:DELIVER content + # / C:GET ( S:GET-OK content / S:GET-EMPTY ) + # / C:ACK + # / C:REJECT + # + # RULE: + # + # The server SHOULD respect the persistent property of basic + # messages and SHOULD make a best-effort to hold persistent + # basic messages on a reliable storage mechanism. + # + # RULE: + # + # The server MUST NOT discard a persistent basic message in + # case of a queue overflow. The server MAY use the + # Channel.Flow method to slow or stop a basic message + # publisher when necessary. + # + # RULE: + # + # The server MAY overflow non-persistent basic messages to + # persistent storage and MAY discard or dead-letter non- + # persistent basic messages on a priority basis if the queue + # size exceeds some configured limit. + # + # RULE: + # + # The server MUST implement at least 2 priority levels for + # basic messages, where priorities 0-4 and 5-9 are treated as + # two distinct levels. The server MAY implement up to 10 + # priority levels. + # + # RULE: + # + # The server MUST deliver messages of the same priority in + # order irrespective of their individual persistence. + # + # RULE: + # + # The server MUST support both automatic and explicit + # acknowledgements on Basic content. + # + + def basic_ack(self, delivery_tag, multiple=False): + """ + acknowledge one or more messages + + This method acknowledges one or more messages delivered via + the Deliver or Get-Ok methods. The client can ask to confirm + a single message or a set of messages up to and including a + specific message. + + PARAMETERS: + delivery_tag: longlong + + server-assigned delivery tag + + The server-assigned and channel-specific delivery tag + + RULE: + + The delivery tag is valid only within the channel + from which the message was received. I.e. a client + MUST NOT receive a message on one channel and then + acknowledge it on another. + + RULE: + + The server MUST NOT use a zero value for delivery + tags. Zero is reserved for client use, meaning "all + messages so far received". + + multiple: boolean + + acknowledge multiple messages + + If set to True, the delivery tag is treated as "up to + and including", so that the client can acknowledge + multiple messages with a single method. If set to + False, the delivery tag refers to a single message. + If the multiple field is True, and the delivery tag + is zero, tells the server to acknowledge all + outstanding mesages. + + RULE: + + The server MUST validate that a non-zero delivery- + tag refers to an delivered message, and raise a + channel exception if this is not the case. + + """ + args = AMQPWriter() + args.write_longlong(delivery_tag) + args.write_bit(multiple) + self._send_method((60, 80), args) + + + def basic_cancel(self, consumer_tag, nowait=False): + """ + end a queue consumer + + This method cancels a consumer. This does not affect already + delivered messages, but it does mean the server will not send + any more messages for that consumer. The client may receive + an abitrary number of messages in between sending the cancel + method and receiving the cancel-ok reply. + + RULE: + + If the queue no longer exists when the client sends a + cancel command, or the consumer has been cancelled for + other reasons, this command has no effect. + + PARAMETERS: + consumer_tag: shortstr + + consumer tag + + Identifier for the consumer, valid within the current + connection. + + RULE: + + The consumer tag is valid only within the channel + from which the consumer was created. I.e. a client + MUST NOT create a consumer in one channel and then + use it in another. + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + """ + args = AMQPWriter() + args.write_shortstr(consumer_tag) + args.write_bit(nowait) + self._send_method((60, 30), args) + return self.wait(allowed_methods=[ + (60, 31), # Channel.basic_cancel_ok + ]) + + + def _basic_cancel_ok(self, args): + """ + confirm a cancelled consumer + + This method confirms that the cancellation was completed. + + PARAMETERS: + consumer_tag: shortstr + + consumer tag + + Identifier for the consumer, valid within the current + connection. + + RULE: + + The consumer tag is valid only within the channel + from which the consumer was created. I.e. a client + MUST NOT create a consumer in one channel and then + use it in another. + + """ + consumer_tag = args.read_shortstr() + del self.callbacks[consumer_tag] + + + def basic_consume(self, queue='', consumer_tag='', no_local=False, + no_ack=False, exclusive=False, nowait=False, + callback=None, ticket=None): + """ + start a queue consumer + + This method asks the server to start a "consumer", which is a + transient request for messages from a specific queue. + Consumers last as long as the channel they were created on, or + until the client cancels them. + + RULE: + + The server SHOULD support at least 16 consumers per queue, + unless the queue was declared as private, and ideally, + impose no limit except as defined by available resources. + + PARAMETERS: + queue: shortstr + + Specifies the name of the queue to consume from. If + the queue name is null, refers to the current queue + for the channel, which is the last declared queue. + + RULE: + + If the client did not previously declare a queue, + and the queue name in this method is empty, the + server MUST raise a connection exception with + reply code 530 (not allowed). + + consumer_tag: shortstr + + Specifies the identifier for the consumer. The + consumer tag is local to a connection, so two clients + can use the same consumer tags. If this field is empty + the server will generate a unique tag. + + RULE: + + The tag MUST NOT refer to an existing consumer. If + the client attempts to create two consumers with + the same non-empty tag the server MUST raise a + connection exception with reply code 530 (not + allowed). + + no_local: boolean + + do not deliver own messages + + If the no-local field is set the server will not send + messages to the client that published them. + + no_ack: boolean + + no acknowledgement needed + + If this field is set the server does not expect + acknowledgments for messages. That is, when a message + is delivered to the client the server automatically and + silently acknowledges it on behalf of the client. This + functionality increases performance but at the cost of + reliability. Messages can get lost if a client dies + before it can deliver them to the application. + + exclusive: boolean + + request exclusive access + + Request exclusive consumer access, meaning only this + consumer can access the queue. + + RULE: + + If the server cannot grant exclusive access to the + queue when asked, - because there are other + consumers active - it MUST raise a channel + exception with return code 403 (access refused). + + nowait: boolean + + do not send a reply method + + If set, the server will not respond to the method. The + client should not wait for a reply method. If the + server could not complete the method it will raise a + channel or connection exception. + + callback: Python callable + + function/method called with each delivered message + + For each message delivered by the broker, the + callable will be called with a Message object + as the single argument. If no callable is specified, + messages are quietly discarded, no_ack should probably + be set to True in that case. + + ticket: short + + RULE: + + The client MUST provide a valid access ticket + giving "read" access rights to the realm for the + queue. + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(queue) + args.write_shortstr(consumer_tag) + args.write_bit(no_local) + args.write_bit(no_ack) + args.write_bit(exclusive) + args.write_bit(nowait) + self._send_method((60, 20), args) + + if not nowait: + consumer_tag = self.wait(allowed_methods=[ + (60, 21), # Channel.basic_consume_ok + ]) + + self.callbacks[consumer_tag] = callback + + return consumer_tag + + + def _basic_consume_ok(self, args): + """ + confirm a new consumer + + The server provides the client with a consumer tag, which is + used by the client for methods called on the consumer at a + later stage. + + PARAMETERS: + consumer_tag: shortstr + + Holds the consumer tag specified by the client or + provided by the server. + + """ + return args.read_shortstr() + + + def _basic_deliver(self, args, msg): + """ + notify the client of a consumer message + + This method delivers a message to the client, via a consumer. + In the asynchronous message delivery model, the client starts + a consumer using the Consume method, then the server responds + with Deliver methods as and when messages arrive for that + consumer. + + RULE: + + The server SHOULD track the number of times a message has + been delivered to clients and when a message is + redelivered a certain number of times - e.g. 5 times - + without being acknowledged, the server SHOULD consider the + message to be unprocessable (possibly causing client + applications to abort), and move the message to a dead + letter queue. + + PARAMETERS: + consumer_tag: shortstr + + consumer tag + + Identifier for the consumer, valid within the current + connection. + + RULE: + + The consumer tag is valid only within the channel + from which the consumer was created. I.e. a client + MUST NOT create a consumer in one channel and then + use it in another. + + delivery_tag: longlong + + server-assigned delivery tag + + The server-assigned and channel-specific delivery tag + + RULE: + + The delivery tag is valid only within the channel + from which the message was received. I.e. a client + MUST NOT receive a message on one channel and then + acknowledge it on another. + + RULE: + + The server MUST NOT use a zero value for delivery + tags. Zero is reserved for client use, meaning "all + messages so far received". + + redelivered: boolean + + message is being redelivered + + This indicates that the message has been previously + delivered to this or another client. + + exchange: shortstr + + Specifies the name of the exchange that the message + was originally published to. + + routing_key: shortstr + + Message routing key + + Specifies the routing key name specified when the + message was published. + + """ + consumer_tag = args.read_shortstr() + delivery_tag = args.read_longlong() + redelivered = args.read_bit() + exchange = args.read_shortstr() + routing_key = args.read_shortstr() + + msg.delivery_info = { + 'channel': self, + 'consumer_tag': consumer_tag, + 'delivery_tag': delivery_tag, + 'redelivered': redelivered, + 'exchange': exchange, + 'routing_key': routing_key, + } + + func = self.callbacks.get(consumer_tag, None) + if func is not None: + func(msg) + + + def basic_get(self, queue='', no_ack=False, ticket=None): + """ + direct access to a queue + + This method provides a direct access to the messages in a + queue using a synchronous dialogue that is designed for + specific types of application where synchronous functionality + is more important than performance. + + PARAMETERS: + queue: shortstr + + Specifies the name of the queue to consume from. If + the queue name is null, refers to the current queue + for the channel, which is the last declared queue. + + RULE: + + If the client did not previously declare a queue, + and the queue name in this method is empty, the + server MUST raise a connection exception with + reply code 530 (not allowed). + + no_ack: boolean + + no acknowledgement needed + + If this field is set the server does not expect + acknowledgments for messages. That is, when a message + is delivered to the client the server automatically and + silently acknowledges it on behalf of the client. This + functionality increases performance but at the cost of + reliability. Messages can get lost if a client dies + before it can deliver them to the application. + + ticket: short + + RULE: + + The client MUST provide a valid access ticket + giving "read" access rights to the realm for the + queue. + + Non-blocking, returns a message object, or None. + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(queue) + args.write_bit(no_ack) + self._send_method((60, 70), args) + return self.wait(allowed_methods=[ + (60, 71), # Channel.basic_get_ok + (60, 72), # Channel.basic_get_empty + ]) + + + def _basic_get_empty(self, args): + """ + indicate no messages available + + This method tells the client that the queue has no messages + available for the client. + + PARAMETERS: + cluster_id: shortstr + + Cluster id + + For use by cluster applications, should not be used by + client applications. + + """ + cluster_id = args.read_shortstr() + + + def _basic_get_ok(self, args, msg): + """ + provide client with a message + + This method delivers a message to the client following a get + method. A message delivered by 'get-ok' must be acknowledged + unless the no-ack option was set in the get method. + + PARAMETERS: + delivery_tag: longlong + + server-assigned delivery tag + + The server-assigned and channel-specific delivery tag + + RULE: + + The delivery tag is valid only within the channel + from which the message was received. I.e. a client + MUST NOT receive a message on one channel and then + acknowledge it on another. + + RULE: + + The server MUST NOT use a zero value for delivery + tags. Zero is reserved for client use, meaning "all + messages so far received". + + redelivered: boolean + + message is being redelivered + + This indicates that the message has been previously + delivered to this or another client. + + exchange: shortstr + + Specifies the name of the exchange that the message + was originally published to. If empty, the message + was published to the default exchange. + + routing_key: shortstr + + Message routing key + + Specifies the routing key name specified when the + message was published. + + message_count: long + + number of messages pending + + This field reports the number of messages pending on + the queue, excluding the message being delivered. + Note that this figure is indicative, not reliable, and + can change arbitrarily as messages are added to the + queue and removed by other clients. + + """ + delivery_tag = args.read_longlong() + redelivered = args.read_bit() + exchange = args.read_shortstr() + routing_key = args.read_shortstr() + message_count = args.read_long() + + msg.delivery_info = { + 'delivery_tag': delivery_tag, + 'redelivered': redelivered, + 'exchange': exchange, + 'routing_key': routing_key, + 'message_count': message_count + } + + return msg + + + def basic_publish(self, msg, exchange='', routing_key='', + mandatory=False, immediate=False, ticket=None): + """ + publish a message + + This method publishes a message to a specific exchange. The + message will be routed to queues as defined by the exchange + configuration and distributed to any active consumers when the + transaction, if any, is committed. + + PARAMETERS: + exchange: shortstr + + Specifies the name of the exchange to publish to. The + exchange name can be empty, meaning the default + exchange. If the exchange name is specified, and that + exchange does not exist, the server will raise a + channel exception. + + RULE: + + The server MUST accept a blank exchange name to + mean the default exchange. + + RULE: + + If the exchange was declared as an internal + exchange, the server MUST raise a channel + exception with a reply code 403 (access refused). + + RULE: + + The exchange MAY refuse basic content in which + case it MUST raise a channel exception with reply + code 540 (not implemented). + + routing_key: shortstr + + Message routing key + + Specifies the routing key for the message. The + routing key is used for routing messages depending on + the exchange configuration. + + mandatory: boolean + + indicate mandatory routing + + This flag tells the server how to react if the message + cannot be routed to a queue. If this flag is True, the + server will return an unroutable message with a Return + method. If this flag is False, the server silently + drops the message. + + RULE: + + The server SHOULD implement the mandatory flag. + + immediate: boolean + + request immediate delivery + + This flag tells the server how to react if the message + cannot be routed to a queue consumer immediately. If + this flag is set, the server will return an + undeliverable message with a Return method. If this + flag is zero, the server will queue the message, but + with no guarantee that it will ever be consumed. + + RULE: + + The server SHOULD implement the immediate flag. + + ticket: short + + RULE: + + The client MUST provide a valid access ticket + giving "write" access rights to the access realm + for the exchange. + + """ + args = AMQPWriter() + if ticket is not None: + args.write_short(ticket) + else: + args.write_short(self.default_ticket) + args.write_shortstr(exchange) + args.write_shortstr(routing_key) + args.write_bit(mandatory) + args.write_bit(immediate) + + self._send_method((60, 40), args, msg) + + + def basic_qos(self, prefetch_size, prefetch_count, a_global): + """ + specify quality of service + + This method requests a specific quality of service. The QoS + can be specified for the current channel or for all channels + on the connection. The particular properties and semantics of + a qos method always depend on the content class semantics. + Though the qos method could in principle apply to both peers, + it is currently meaningful only for the server. + + PARAMETERS: + prefetch_size: long + + prefetch window in octets + + The client can request that messages be sent in + advance so that when the client finishes processing a + message, the following message is already held + locally, rather than needing to be sent down the + channel. Prefetching gives a performance improvement. + This field specifies the prefetch window size in + octets. The server will send a message in advance if + it is equal to or smaller in size than the available + prefetch size (and also falls into other prefetch + limits). May be set to zero, meaning "no specific + limit", although other prefetch limits may still + apply. The prefetch-size is ignored if the no-ack + option is set. + + RULE: + + The server MUST ignore this setting when the + client is not processing any messages - i.e. the + prefetch size does not limit the transfer of + single messages to a client, only the sending in + advance of more messages while the client still + has one or more unacknowledged messages. + + prefetch_count: short + + prefetch window in messages + + Specifies a prefetch window in terms of whole + messages. This field may be used in combination with + the prefetch-size field; a message will only be sent + in advance if both prefetch windows (and those at the + channel and connection level) allow it. The prefetch- + count is ignored if the no-ack option is set. + + RULE: + + The server MAY send less data in advance than + allowed by the client's specified prefetch windows + but it MUST NOT send more. + + a_global: boolean + + apply to entire connection + + By default the QoS settings apply to the current + channel only. If this field is set, they are applied + to the entire connection. + + """ + args = AMQPWriter() + args.write_long(prefetch_size) + args.write_short(prefetch_count) + args.write_bit(a_global) + self._send_method((60, 10), args) + return self.wait(allowed_methods=[ + (60, 11), # Channel.basic_qos_ok + ]) + + + def _basic_qos_ok(self, args): + """ + confirm the requested qos + + This method tells the client that the requested QoS levels + could be handled by the server. The requested QoS applies to + all active consumers until a new QoS is defined. + + """ + pass + + + def basic_recover(self, requeue=False): + """ + redeliver unacknowledged messages + + This method asks the broker to redeliver all unacknowledged + messages on a specified channel. Zero or more messages may be + redelivered. This method is only allowed on non-transacted + channels. + + RULE: + + The server MUST set the redelivered flag on all messages + that are resent. + + RULE: + + The server MUST raise a channel exception if this is + called on a transacted channel. + + PARAMETERS: + requeue: boolean + + requeue the message + + If this field is False, the message will be redelivered + to the original recipient. If this field is True, the + server will attempt to requeue the message, + potentially then delivering it to an alternative + subscriber. + + """ + args = AMQPWriter() + args.write_bit(requeue) + self._send_method((60, 100), args) + + + def basic_reject(self, delivery_tag, requeue): + """ + reject an incoming message + + This method allows a client to reject a message. It can be + used to interrupt and cancel large incoming messages, or + return untreatable messages to their original queue. + + RULE: + + The server SHOULD be capable of accepting and process the + Reject method while sending message content with a Deliver + or Get-Ok method. I.e. the server should read and process + incoming methods while sending output frames. To cancel a + partially-send content, the server sends a content body + frame of size 1 (i.e. with no data except the frame-end + octet). + + RULE: + + The server SHOULD interpret this method as meaning that + the client is unable to process the message at this time. + + RULE: + + A client MUST NOT use this method as a means of selecting + messages to process. A rejected message MAY be discarded + or dead-lettered, not necessarily passed to another + client. + + PARAMETERS: + delivery_tag: longlong + + server-assigned delivery tag + + The server-assigned and channel-specific delivery tag + + RULE: + + The delivery tag is valid only within the channel + from which the message was received. I.e. a client + MUST NOT receive a message on one channel and then + acknowledge it on another. + + RULE: + + The server MUST NOT use a zero value for delivery + tags. Zero is reserved for client use, meaning "all + messages so far received". + + requeue: boolean + + requeue the message + + If this field is False, the message will be discarded. + If this field is True, the server will attempt to + requeue the message. + + RULE: + + The server MUST NOT deliver the message to the + same client within the context of the current + channel. The recommended strategy is to attempt + to deliver the message to an alternative consumer, + and if that is not possible, to move the message + to a dead-letter queue. The server MAY use more + sophisticated tracking to hold the message on the + queue and redeliver it to the same client at a + later stage. + + """ + args = AMQPWriter() + args.write_longlong(delivery_tag) + args.write_bit(requeue) + self._send_method((60, 90), args) + + + def _basic_return(self, args, msg): + """ + return a failed message + + This method returns an undeliverable message that was + published with the "immediate" flag set, or an unroutable + message published with the "mandatory" flag set. The reply + code and text provide information about the reason that the + message was undeliverable. + + PARAMETERS: + reply_code: short + + The reply code. The AMQ reply codes are defined in AMQ + RFC 011. + + reply_text: shortstr + + The localised reply text. This text can be logged as an + aid to resolving issues. + + exchange: shortstr + + Specifies the name of the exchange that the message + was originally published to. + + routing_key: shortstr + + Message routing key + + Specifies the routing key name specified when the + message was published. + + """ + reply_code = args.read_short() + reply_text = args.read_shortstr() + exchange = args.read_shortstr() + routing_key = args.read_shortstr() + + self.returned_messages.put( + (reply_code, reply_text, exchange, routing_key, msg) + ) + + + ############# + # + # Tx + # + # + # work with standard transactions + # + # Standard transactions provide so-called "1.5 phase commit". We + # can ensure that work is never lost, but there is a chance of + # confirmations being lost, so that messages may be resent. + # Applications that use standard transactions must be able to + # detect and ignore duplicate messages. + # + # GRAMMAR: + # + # tx = C:SELECT S:SELECT-OK + # / C:COMMIT S:COMMIT-OK + # / C:ROLLBACK S:ROLLBACK-OK + # + # RULE: + # + # An client using standard transactions SHOULD be able to + # track all messages received within a reasonable period, and + # thus detect and reject duplicates of the same message. It + # SHOULD NOT pass these to the application layer. + # + # + + def tx_commit(self): + """ + commit the current transaction + + This method commits all messages published and acknowledged in + the current transaction. A new transaction starts immediately + after a commit. + + """ + self._send_method((90, 20)) + return self.wait(allowed_methods=[ + (90, 21), # Channel.tx_commit_ok + ]) + + + def _tx_commit_ok(self, args): + """ + confirm a successful commit + + This method confirms to the client that the commit succeeded. + Note that if a commit fails, the server raises a channel + exception. + + """ + pass + + + def tx_rollback(self): + """ + abandon the current transaction + + This method abandons all messages published and acknowledged + in the current transaction. A new transaction starts + immediately after a rollback. + + """ + self._send_method((90, 30)) + return self.wait(allowed_methods=[ + (90, 31), # Channel.tx_rollback_ok + ]) + + + def _tx_rollback_ok(self, args): + """ + confirm a successful rollback + + This method confirms to the client that the rollback + succeeded. Note that if an rollback fails, the server raises a + channel exception. + + """ + pass + + + def tx_select(self): + """ + select standard transaction mode + + This method sets the channel to use standard transactions. + The client must use this method at least once on a channel + before using the Commit or Rollback methods. + + """ + self._send_method((90, 10)) + return self.wait(allowed_methods=[ + (90, 11), # Channel.tx_select_ok + ]) + + + def _tx_select_ok(self, args): + """ + confirm transaction mode + + This method confirms to the client that the channel was + successfully set to use standard transactions. + + """ + pass + + + _METHOD_MAP = { + (20, 11): _open_ok, + (20, 20): _flow, + (20, 21): _flow_ok, + (20, 30): _alert, + (20, 40): _close, + (20, 41): _close_ok, + (30, 11): _access_request_ok, + (40, 11): _exchange_declare_ok, + (40, 21): _exchange_delete_ok, + (50, 11): _queue_declare_ok, + (50, 21): _queue_bind_ok, + (50, 31): _queue_purge_ok, + (50, 41): _queue_delete_ok, + (60, 11): _basic_qos_ok, + (60, 21): _basic_consume_ok, + (60, 31): _basic_cancel_ok, + (60, 50): _basic_return, + (60, 60): _basic_deliver, + (60, 71): _basic_get_ok, + (60, 72): _basic_get_empty, + (90, 11): _tx_select_ok, + (90, 21): _tx_commit_ok, + (90, 31): _tx_rollback_ok, + } |