summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-11-28 11:15:20 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2010-11-28 11:15:20 +0000
commitd34d84aa046ffb30ed50c98596ad071f773bde07 (patch)
treed76161af59c267783f1565b709b44ef52a10b1b1
parent963d834477df9a4bedaca26b13430b6ee8ef2858 (diff)
downloadrabbitmq-server-bug23552.tar.gz
experimental single-frame publish/deliver commandsbug23552
-rw-r--r--src/rabbit_channel.erl50
1 files changed, 45 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6bed63a3..0cbfa9a1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -474,6 +474,44 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
_ -> add_tx_participants(DeliveredQPids, State)
end};
+handle_method(#'basic.send'{exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ mandatory = Mandatory,
+ immediate = Immediate,
+ content = Payload},
+ _, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
+ Content = rabbit_basic:build_content(#'P_basic'{}, Payload),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
+ Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ %% We decode the content's properties here because we're almost
+ %% certain to want to look at delivery-mode and priority.
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ IsPersistent = is_message_persistent(DecodedContent),
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ case RoutingRes of
+ routed -> ok;
+ unroutable -> ok = basic_return(Message, WriterPid, no_route);
+ not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
+ end,
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State),
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
+ end};
+
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
@@ -1116,16 +1154,18 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
- content = Content}}) ->
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
+ content = #content{
+ payload_fragments_rev = [Payload]}}}) ->
+ M = #'basic.receive'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
+ routing_key = RoutingKey,
+ content = Payload},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, self(), M, Content);
- false -> rabbit_writer:send_command(WriterPid, M, Content)
+ WriterPid, QPid, self(), M);
+ false -> rabbit_writer:send_command(WriterPid, M)
end.
terminate(_State) ->