summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl136
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_mixed_queue.erl4
3 files changed, 64 insertions, 87 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 15b3a03655..8fe0d62386 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -175,12 +175,12 @@ deliver_queue(Fun, FunAcc0,
case Fun(AckRequired, FunAcc0, State) of
{empty, FunAcc1, State2} ->
{FunAcc1, State2};
- {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} ->
+ {{Msg, IsDelivered, AckTag, Remaining}, FunAcc1, State2} ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}), %% TODO FIXME
+ {QName, self(), NextId, IsDelivered, Msg}),
NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
+ true -> dict:store(NextId, {Msg, AckTag}, UAM);
false -> UAM
end,
NewC = C#cr{unsent_message_count = Count + 1,
@@ -201,81 +201,74 @@ deliver_queue(Fun, FunAcc0,
false ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers})
+ deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers })
end;
{empty, _} ->
{FunAcc0, State}
end.
-deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) ->
+deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
MS3 = case {Res, AckRequired} of
- {empty, _} -> MS2;
{_, true} -> MS2;
- {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} ->
+ {empty, _} -> MS2;
+ {{_Msg, _IsDelivered, AckTag, _Remaining}, false} ->
{ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2),
- MS3
+ MS4
end,
{Res, Acc, State #q { mixed_state = MS3 }}.
run_message_queue(State) ->
- {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State),
+ {undefined, State2} = deliver_queue(fun deliver_from_queue/3, undefined, State),
State2.
-attempt_delivery(none, Message, State) ->
+attempt_immediate_delivery(none, Msg, State) ->
Fun =
fun (AckRequired, false, State2) ->
{AckTag, State3} =
if AckRequired ->
- %% TODO API CHANGE
- {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Message,
+ {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg,
State2 #q.mixed_state),
{AckTag2, State2 #q { mixed_state = MS }};
true ->
{noack, State2}
end,
- {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME
+ {{Msg, false, AckTag, 0}, true, State3}
end,
deliver_queue(Fun, false, State);
-attempt_delivery(Txn, Message, State) ->
- {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
- record_pending_message(Txn, Message),
+attempt_immediate_delivery(Txn, Msg, State) ->
+ {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
+ record_pending_message(Txn, Msg),
{true, State #q { mixed_state = MS }}.
-deliver_or_enqueue(Txn, Message, State) ->
- case attempt_delivery(Txn, Message, State) of
+deliver_or_enqueue(Txn, Msg, State) ->
+ case attempt_immediate_delivery(Txn, Msg, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state),
{false, NewState #q { mixed_state = MS }}
end.
%% all these messages have already been delivered at least once and
%% not ack'd, but need to be either redelivered or requeued
-deliver_or_requeue_n(Messages, State) ->
- {AutoAcks, Remaining} =
- dropwhilefoldl(deliver_or_requeue_msg/2, {[], State}, Messages),
- {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State #q.mixed_state), %% TODO FIXME
- case Remaining of
- [] -> run_message_queue(State #q { mixed_state = MS });
- _ -> {ok, MS2} = rabbit_mixed_queue:requeue(Remaining, MS), %% TODO FIXME
- State #q { mixed_state = MS2 }
+deliver_or_requeue_n([], State) ->
+ State;
+deliver_or_requeue_n(MsgsWithAcks, State) ->
+ {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
+ deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
+ {ok, MS} = rabbit_mixed_queue:ack(lists:reverse(AutoAcks), NewState #q.mixed_state),
+ case OutstandingMsgs of
+ [] -> run_message_queue(NewState #q { mixed_state = MS });
+ _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
+ NewState #q { mixed_state = MS2 }
end.
-deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here
- Fun = fun (AckRequired, {false, AcksAcc}, State2) ->
- AcksAcc2 = if AckRequired -> AcksAcc;
- true -> [AckTag|AcksAcc]
- end,
- {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME
- case deliver_queue(Fun, {false, AcksAcc}, State) of
- {{true, AcksAcc3}, State1} ->
- {true, {AcksAcc3, State1}};
- {{false, AcksAcc}, State1} ->
- {false, {AcksAcc, State1}}
- end.
+deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
+deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
+ {{Msg, true, AckTag, Len}, {Len - 1, [AcksAcc], MsgsWithAcks}, State}.
block_consumers(ChPid, RoundRobin) ->
%%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
@@ -350,8 +343,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
erase({ch, ChPid}),
case check_auto_delete(
deliver_or_requeue_n(
- [Message || %% TODO NEED TO GRAB ACKTAGS OUT OF HERE AND PASS THEM THROUGH
- {_Messsage_id, Message} <- dict:to_list(UAM)],
+ [MsgWithAck ||
+ {_MsgId, MsgWithAck} <- dict:to_list(UAM)],
State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -421,10 +414,6 @@ all_tx_record() ->
all_tx() ->
[Txn || {{txn, Txn}, _} <- get()].
-is_tx_persistent(Txn) ->
- #tx{is_persistent = Res} = lookup_tx(Txn),
- Res.
-
record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) ->
Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending],
@@ -445,12 +434,13 @@ commit_transaction(Txn, State) ->
case lookup_ch(ChPid) of
not_found -> State;
C = #cr { unacked_messages = UAM } ->
- {Acked, Remaining} =
- collect_messages(PendingAcksAppended, UAM),
+ {MsgWithAcks, Remaining} =
+ collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- MS = rabbit_mixed_queue:tx_commit(PendingMessagesOrdered,
- Acked,
- State #q.mixed_state),
+ MS = rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered,
+ lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ State #q.mixed_state),
State #q { mixed_state = MS }
end.
@@ -461,7 +451,7 @@ rollback_transaction(Txn, State) ->
State #q { mixed_state = MS }.
%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
-%% err, A = C `intersect` D , via projection through the dict that is A
+%% err, A = C `intersect` D , via projection through the dict that is C
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
@@ -475,8 +465,8 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(messages_ready, #q{message_buffer = MessageBuffer}) ->
- queue:len(MessageBuffer);
+i(messages_ready, #q { mixed_state = MS }) ->
+ rabbit_mixed_queue:length(MS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -526,7 +516,7 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, Message, State),
+ {Delivered, NewState} = attempt_immediate_delivery(Txn, Message, State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
@@ -553,23 +543,23 @@ handle_call({basic_get, ChPid, NoAck}, _From,
}) ->
case rabbit_mixed_queue:deliver(MS) of
{empty, MS2} -> reply(empty, State #q { mixed_state = MS2 });
- {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, MS2} ->
+ {{Msg, IsDelivered, AckTag, Remaining}, MS2} ->
AckRequired = not(NoAck),
- MS3 =
+ {ok, MS3} =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
+ NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- MS2;
+ {ok, MS2};
false ->
rabbit_mixed_queue:ack([AckTag], MS2)
end,
- Message = {QName, self(), NextId, IsDelivered, Msg}, %% TODO, FIX UP
+ Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
- State#q{next_msg_id = NextId + 1});
- {empty, _} ->
- reply(empty, State)
+ State #q { next_msg_id = NextId + 1,
+ mixed_state = MS3
+ })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -645,10 +635,10 @@ handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 });
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer, mixed_state = MS}) ->
- IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(),
+ State = #q { mixed_state = MS }) ->
{Length, MS2} = rabbit_mixed_queue:length(MS),
+ IsEmpty = Length == 0,
+ IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -695,25 +685,23 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Acked, Remaining} = collect_messages(MsgIds, UAM),
+ {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
case Txn of
none ->
- MS = rabbit_mixed_queue:ack(Acked, State #q.mixed_state), %% TODO API
+ Acks = lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ {ok, MS} = rabbit_mixed_queue:ack(Acks, State #q.mixed_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { mixed_state = MS });
_ ->
record_pending_acks(Txn, ChPid, MsgIds),
noreply(State)
- end,
+ end
end;
handle_cast({rollback, Txn}, State) ->
NewState = rollback_transaction(Txn, State),
erase_tx(Txn),
- noreply(State2);
-
-handle_cast({redeliver, Messages}, State) ->
- noreply(ok); %% TODO - probably remove - only used by the old persister
+ noreply(NewState);
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
@@ -722,9 +710,9 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[ChPid]),
noreply(State);
C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM), %% TODO Messages must contain AckTags too
+ {MsgWithAcks, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_requeue_n(Messages, State))
+ noreply(deliver_or_requeue_n(MsgWithAcks, State))
end;
handle_cast({unblock, ChPid}, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f90abe3ffb..f207038eff 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,7 +51,6 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
--export([dropwhilefoldl/3]).
-import(mnesia).
-import(lists).
@@ -407,13 +406,3 @@ stop_applications(Apps) ->
not_started,
cannot_stop_application,
Apps).
-
-dropwhilefoldl(_PredFun, Acc0, []) ->
- {Acc0, []};
-dropwhilefoldl(PredFun, Acc0, [E|List]) ->
- case PredFun(E, Acc0) of
- {true, Acc1} ->
- dropwhilefoldl(PredFun, Acc1, List);
- {false, Acc1} ->
- {Acc1, List}
- end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index e56e667d4f..24d0de8d59 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -80,8 +80,8 @@ publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersist
State = #mqstate { mode = Mode, queue = Q })
when Mode =:= disk orelse IsPersistent ->
ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
- {MsgId, false, Ack, 0} = rabbit_disk_queue:phantom_deliver(Q),
- {ok, Ack, State};
+ {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {ok, AckTag, State};
publish_delivered(#basic_message { is_persistent = false },
State = #mqstate { mode = mixed }) ->
{ok, noack, State}.