summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl8
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl59
-rw-r--r--src/rabbit_basic.erl16
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_exchange.erl20
-rw-r--r--src/rabbit_router.erl59
-rw-r--r--src/rabbit_tests.erl2
9 files changed, 109 insertions, 89 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 50ddafbaef..a28409311c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -66,6 +66,8 @@
-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id, next_seq_id}).
+-record(delivery, {mandatory, immediate, txn, sender, message}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,6 +139,12 @@
guid :: guid(),
is_persistent :: bool()}).
-type(message() :: basic_message()).
+-type(delivery() ::
+ #delivery{mandatory :: bool(),
+ immediate :: bool(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
-type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 542ea242dc..08c67946dc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
@@ -85,7 +85,7 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()).
+-spec(deliver/2 :: (pid(), delivery()) -> bool()).
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
@@ -243,13 +243,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
-deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
-deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
+deliver(QPid, #delivery{immediate = true,
+ txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
+ infinity);
+deliver(QPid, #delivery{mandatory = true,
+ txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
true;
-deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:cast(QPid, {deliver, Txn, Message}),
+deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
redeliver(QPid, Messages) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4e02f2e4e5..a542172b66 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -66,6 +66,7 @@
monitor_ref,
unacked_messages,
is_limit_active,
+ txn,
unsent_message_count}).
-define(INFO_KEYS,
@@ -136,6 +137,7 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
is_limit_active = false,
+ txn = none,
unsent_message_count = 0},
put(Key, C),
C;
@@ -159,6 +161,11 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
+record_current_channel_tx(ChPid, Txn) ->
+ %% as a side effect this also starts monitoring the channel (if
+ %% that wasn't happening already)
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
+
deliver_queue(Fun, FunAcc0,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
@@ -232,7 +239,7 @@ run_message_queue(State) ->
{undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State),
State2.
-attempt_immediate_delivery(none, Msg, State) ->
+attempt_immediate_delivery(none, _ChPid, Msg, State) ->
Fun =
fun (is_message_ready, false, _State) ->
true;
@@ -248,13 +255,13 @@ attempt_immediate_delivery(none, Msg, State) ->
{{Msg, false, AckTag, 0}, true, State3}
end,
deliver_queue(Fun, false, State);
-attempt_immediate_delivery(Txn, Msg, State) ->
+attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
{ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
- record_pending_message(Txn, Msg),
+ record_pending_message(Txn, ChPid, Msg),
{true, State #q { mixed_state = MS }}.
-deliver_or_enqueue(Txn, Msg, State) ->
- case attempt_immediate_delivery(Txn, Msg, State) of
+deliver_or_enqueue(Txn, ChPid, Msg, State) ->
+ case attempt_immediate_delivery(Txn, ChPid, Msg, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
@@ -351,24 +358,30 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
- #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
+ #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
+ unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
+ State1 =
+ case Txn of
+ none -> State;
+ _ -> rollback_transaction(Txn, State)
+ end,
case check_auto_delete(
deliver_or_requeue_n(
[MsgWithAck ||
{_MsgId, MsgWithAck} <- dict:to_list(UAM)],
- State#q{
+ State1 # q {
exclusive_consumer = case Holder of
{ChPid, _} -> none;
Other -> Other
end,
round_robin = NewActive})) of
- {continue, NewState} ->
- noreply(NewState);
- {stop, NewState} ->
- {stop, normal, NewState}
+ {continue, State2} ->
+ noreply(State2);
+ {stop, State2} ->
+ {stop, normal, State2}
end
end.
@@ -428,15 +441,18 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) ->
+record_pending_message(Txn, ChPid, Message = #basic_message { is_persistent = IsPersistent }) ->
Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
+ record_current_channel_tx(ChPid, Txn),
store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
is_persistent = IsPersistentTxn orelse IsPersistent
}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
+ record_current_channel_tx(ChPid, Txn),
+ store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
+ ch_pid = ChPid}).
commit_transaction(Txn, State) ->
#tx { ch_pid = ChPid,
@@ -465,6 +481,7 @@ rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
{ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
+ erase_tx(Txn),
State #q { mixed_state = MS }.
%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
@@ -519,7 +536,7 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call({deliver_immediately, Txn, Message}, _From, State) ->
+handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -533,12 +550,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_immediate_delivery(Txn, Message, State),
+ {Delivered, NewState} = attempt_immediate_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({deliver, Txn, Message}, _From, State) ->
+handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
+ {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
@@ -692,9 +709,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(locked, State)
end.
-handle_cast({deliver, Txn, Message}, State) ->
+handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
+ {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
@@ -716,9 +733,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
end;
handle_cast({rollback, Txn}, State) ->
- NewState = rollback_transaction(Txn, State),
- erase_tx(Txn),
- noreply(NewState);
+ noreply(rollback_transaction(Txn, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 1d44543aac..0673bdd8d2 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,14 +33,15 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/4, message/4]).
+-export([publish/1, message/4, delivery/4]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) ->
+-spec(publish/1 :: (delivery()) ->
{ok, routing_result(), [pid()]} | not_found()).
+-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
message()).
@@ -48,17 +49,20 @@
%%----------------------------------------------------------------------------
-publish(Mandatory, Immediate, Txn,
- Message = #basic_message{exchange_name = ExchangeName}) ->
+publish(Delivery = #delivery{
+ message = #basic_message{exchange_name = ExchangeName}}) ->
case rabbit_exchange:lookup(ExchangeName) of
{ok, X} ->
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message),
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
{ok, RoutingRes, DeliveredQPids};
Other ->
Other
end.
+delivery(Mandatory, Immediate, Txn, Message) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
+ sender = self(), message = Message}.
+
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5142f9b7a3..ed71509725 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -321,8 +321,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
guid = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent)},
{RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
- Message),
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
routed ->
ok;
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index d73edb73b8..76016a8cb2 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -75,8 +75,10 @@ publish(_Other, _Format, _Data, _State) ->
publish1(RoutingKey, Format, Data, LogExch) ->
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(false, false, none,
- rabbit_basic:message(
- LogExch, RoutingKey, <<"text/plain">>,
- list_to_binary(io_lib:format(Format, Data)))),
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, none,
+ rabbit_basic:message(
+ LogExch, RoutingKey, <<"text/plain">>,
+ list_to_binary(io_lib:format(Format, Data))))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index ca0e337b84..7d9948f06f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- publish/5]).
+ publish/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -72,8 +72,7 @@
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
- {routing_result(), [pid()]}).
+-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -188,13 +187,12 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Mandatory, Immediate, Txn, Message) ->
- publish(X, [], Mandatory, Immediate, Txn, Message).
+publish(X, Delivery) ->
+ publish(X, [], Delivery).
-publish(X, Seen, Mandatory, Immediate, Txn,
- Message = #basic_message{routing_key = RK, content = C}) ->
- case rabbit_router:deliver(route(X, RK, C),
- Mandatory, Immediate, Txn, Message) of
+publish(X, Seen, Delivery = #delivery{
+ message = #basic_message{routing_key = RK, content = C}}) ->
+ case rabbit_router:deliver(route(X, RK, C), Delivery) of
{_, []} = R ->
#exchange{name = XName, arguments = Args} = X,
case rabbit_misc:r_arg(XName, exchange, Args,
@@ -209,9 +207,7 @@ publish(X, Seen, Mandatory, Immediate, Txn,
false ->
case lookup(AName) of
{ok, AX} ->
- publish(AX, NewSeen,
- Mandatory, Immediate, Txn,
- Message);
+ publish(AX, NewSeen, Delivery);
{error, not_found} ->
rabbit_log:warning(
"alternate exchange for ~s "
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 57166428bf..10f80cc301 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
-export([start_link/0,
- deliver/5]).
+ deliver/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -50,8 +50,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
- {routing_result(), [pid()]}).
+-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
@@ -62,13 +61,13 @@ start_link() ->
-ifdef(BUG19758).
-deliver(QPids, Mandatory, Immediate, Txn, Message) ->
- check_delivery(Mandatory, Immediate,
- run_bindings(QPids, Mandatory, Immediate, Txn, Message)).
+deliver(QPids, Delivery) ->
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ run_bindings(QPids, Delivery)).
-else.
-deliver(QPids, Mandatory, Immediate, Txn, Message) ->
+deliver(QPids, Delivery) ->
%% we reduce inter-node traffic by grouping the qpids by node and
%% only delivering one copy of the message to each node involved,
%% which then in turn delivers it to its queues.
@@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) ->
[QPid], D)
end,
dict:new(), QPids)),
- Mandatory, Immediate, Txn, Message).
+ Delivery).
-deliver_per_node([{Node, QPids}], Mandatory, Immediate,
- Txn, Message)
- when Node == node() ->
+deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
%% optimisation
- check_delivery(Mandatory, Immediate,
- run_bindings(QPids, Mandatory, Immediate, Txn, Message));
-deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
- Txn, Message) ->
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ run_bindings(QPids, Delivery));
+deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver in run_bindings below will deliver the
%% message to the queue process asynchronously, and return true,
@@ -101,20 +98,16 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
{routed,
lists:flatmap(
fun ({Node, QPids}) ->
- gen_server2:cast(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message}),
+ gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
QPids
end,
NodeQPids)};
-deliver_per_node(NodeQPids, Mandatory, Immediate,
- Txn, Message) ->
+deliver_per_node(NodeQPids, Delivery) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server2:call(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message},
- infinity)
+ try gen_server2:call({?SERVER, Node},
+ {deliver, QPids, Delivery},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
@@ -131,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
end,
{false, []},
R),
- check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}).
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ {Routed, lists:append(Handled)}).
-endif.
@@ -140,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
init([]) ->
{ok, no_state}.
-handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
- From, State) ->
+handle_call({deliver, QPids, Delivery}, From, State) ->
spawn(
fun () ->
- R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
+ R = run_bindings(QPids, Delivery),
gen_server2:reply(From, R)
end),
{noreply, State}.
-handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message},
- State) ->
+handle_cast({deliver, QPids, Delivery}, State) ->
%% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Mandatory, Immediate, Txn, Message),
+ run_bindings(QPids, Delivery),
{noreply, State}.
handle_info(_Info, State) ->
@@ -166,11 +158,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
+run_bindings(QPids, Delivery) ->
lists:foldl(
fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
- Txn, Message, QPid) of
+ case catch rabbit_amqqueue:deliver(QPid, Delivery) of
true -> {true, [QPid | Handled]};
false -> {true, Handled};
{'EXIT', _Reason} -> {Routed, Handled}
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4b7487b0c0..5d3c27703b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -56,7 +56,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
- passed = test_disk_queue(),
+ %%passed = test_disk_queue(),
passed.
test_priority_queue() ->