summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl116
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_channel.erl188
-rw-r--r--src/rabbit_connection_sup.erl1
-rw-r--r--src/rabbit_msg_store.erl229
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_reader.erl30
-rw-r--r--src/rabbit_tests.erl19
-rw-r--r--src/rabbit_variable_queue.erl133
10 files changed, 434 insertions, 320 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 981dd31daa..fde543467a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -427,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {[], GTC}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
+ {ChPid, MsgSeqNos} <- group_confirms_by_channel(
+ CMs1, [{Ch, [MsgSeqNo]}])];
+ [] ->
+ ok
end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+ State#q{guid_to_channel = GTC1}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- State;
+ {no_confirm, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
@@ -451,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
- State.
-
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
+ {no_confirm, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -473,12 +481,12 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ, q = Q}) ->
- NeedsConfirming = Message#basic_message.is_persistent andalso
- Q#amqqueue.durable,
- case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+ {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
+ case {NeedsConfirming, MsgSeqNo} of
+ {_, undefined} -> ok;
+ {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ {confirm, _} -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -490,31 +498,37 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = NeedsConfirming},
+ needs_confirming = (NeedsConfirming =:= confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, NeedsConfirming, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ {NeedsConfirming,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
record_current_channel_tx(ChPid, Txn),
{true,
+ NeedsConfirming,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, State1} ->
+ {true, _, State1} ->
{true, State1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}} ->
+ #delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming = (MsgSeqNo =/= undefined)},
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
@@ -823,7 +837,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -838,12 +852,9 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
@@ -881,7 +892,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -1019,8 +1030,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1029,7 +1040,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1038,7 +1051,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 79910b95aa..233e2b9060 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -97,7 +97,7 @@ description() ->
{description, <<"Internal user / password database">>}].
check_user_login(Username, []) ->
- internal_check_user_login(Username, fun() -> true end);
+ internal_check_user_login(Username, fun(_) -> true end);
check_user_login(Username, [{password, Password}]) ->
internal_check_user_login(
Username,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2067e306cf..930e48e6cd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -49,8 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_CONFIRMS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -97,8 +94,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
--spec(flush_confirms/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
-flush_confirms(Pid) ->
- gen_server2:cast(Pid, flush_confirms).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -192,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
- publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
+ publish_seqno = 1,
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_confirms, State) ->
- {noreply, internal_flush_confirms(State)};
-
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, confirm(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> confirm(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -484,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-confirm(undefined, _QPid, State) ->
+confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
+ MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
+ MS = gb_sets:from_list(MsgSeqNos),
+ QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
+ send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS),
+ queues_for_msg = QFM1});
+confirm(MsgSeqNos, QPid, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ case gb_sets:is_element(MsgSeqNo, UC0) of
+ false -> {DMs, State0};
+ true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end
+ end, {[], State}, MsgSeqNos),
+ send_confirms(DoneMessages, State1).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1010,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1253,12 +1218,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1272,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
- ?MODULE, flush_confirms, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+send_confirms([], State) ->
State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
-
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SCs = lists:usort(Cs),
+ CutOff = case gb_sets:is_empty(UC) of
+ true -> lists:last(SCs) + 1;
+ false -> gb_sets:smallest(UC)
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
+ ok = lists:foldl(fun(T, ok) ->
+ rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = T})
+ end, ok, Ss),
+ State.
-terminate(State) ->
- stop_confirm_timer(State),
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index ff3995b54a..a6b1f7faad 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -78,4 +78,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2e1834c796..1fe06a1f3a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,6 +81,7 @@
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
+ dying_clients, %% set of dying clients
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
@@ -306,6 +307,17 @@
%% sure that reads are not attempted from files which are in the
%% process of being garbage collected.
%%
+%% When a message is removed, its reference count is decremented. Even
+%% if the reference count becomes 0, its entry is not removed. This is
+%% because in the event of the same message being sent to several
+%% different queues, there is the possibility of one queue writing and
+%% removing the message before other queues write it at all. Thus
+%% accomodating 0-reference counts allows us to avoid unnecessary
+%% writes here. Of course, there are complications: the file to which
+%% the message has already been written could be locked pending
+%% deletion or GC, which means we have to rewrite the message as the
+%% original copy will now be lost.
+%%
%% The server automatically defers reads, removes and contains calls
%% that occur which refer to files which are currently being
%% GC'd. Contains calls are only deferred in order to ensure they do
@@ -323,6 +335,55 @@
%% heavily overloaded, clients can still write and read messages with
%% very low latency and not block at all.
%%
+%% Clients of the msg_store are required to register before using the
+%% msg_store. This provides them with the necessary client-side state
+%% to allow them to directly access the various caches and files. When
+%% they terminate, they should deregister. They can do this by calling
+%% either client_terminate/1 or client_delete_and_terminate/1. The
+%% differences are: (a) client_terminate is synchronous. As a result,
+%% if the msg_store is badly overloaded and has lots of in-flight
+%% writes and removes to process, this will take some time to
+%% return. However, once it does return, you can be sure that all the
+%% actions you've issued to the msg_store have been processed. (b) Not
+%% only is client_delete_and_terminate/1 asynchronous, but it also
+%% permits writes and subsequent removes from the current
+%% (terminating) client which are still in flight to be safely
+%% ignored. Thus from the point of view of the msg_store itself, and
+%% all from the same client:
+%%
+%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
+%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
+%%
+%% The client obviously sent T after all the other messages (up to
+%% W4), but because the msg_store prioritises messages, the T can be
+%% promoted and thus received early.
+%%
+%% Thus at the point of the msg_store receiving T, we have messages 1
+%% and 2 with a refcount of 1. After T, W3 will be ignored because
+%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
+%% ignored because the messages that they refer to were already known
+%% to the msg_store prior to T. However, it can be a little more
+%% complex: after the first R2, the refcount of msg 2 is 0. At that
+%% point, if a GC occurs or file deletion, msg 2 could vanish, which
+%% would then mean that the subsequent W2 and R2 are then ignored.
+%%
+%% The use case then for client_delete_and_terminate/1 is if the
+%% client wishes to remove everything it's written to the msg_store:
+%% it issues removes for all messages it's written and not removed,
+%% and then calls client_delete_and_terminate/1. At that point, any
+%% in-flight writes (and subsequent removes) can be ignored, but
+%% removes and writes for messages the msg_store already knows about
+%% will continue to be processed normally (which will normally just
+%% involve modifying the reference count, which is fast). Thus we save
+%% disk bandwidth for writes which are going to be immediately removed
+%% again by the the terminating client.
+%%
+%% We use a separate set to keep track of the dying clients in order
+%% to keep that set, which is inspected on every write and remove, as
+%% small as possible. Inspecting client_refs - the set of all clients
+%% - would degrade performance with many healthy clients and few, if
+%% any, dying clients, which is the typical case.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
+ ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
@@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients = sets:new(),
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) ->
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
+ {client_dying, _Pid} -> 7;
_ -> 0
end.
@@ -681,15 +745,22 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1).
+handle_cast({client_dying, CRef},
+ State = #msstate { dying_clients = DyingClients }) ->
+ DyingClients1 = sets:add_element(CRef, DyingClients),
+ write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+
handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate {
- client_refs = sets:del_element(CRef, ClientRefs) });
+ State = #msstate { client_refs = ClientRefs,
+ dying_clients = DyingClients }) ->
+ State1 = clear_client_callback(
+ CRef, State #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs),
+ dying_clients = sets:del_element(CRef, DyingClients) }),
+ noreply(remove_message(CRef, CRef, State1));
handle_cast({write, CRef, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
+ State = #msstate { file_summary_ets = FileSummaryEts,
current_file = CurFile,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
@@ -705,41 +776,47 @@ handle_cast({write, CRef, Guid},
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
- case index_lookup(Guid, State1) of
- not_found ->
+ case should_mask_action(CRef, Guid, State1) of
+ {true, _Location} ->
+ noreply(State1);
+ {false, not_found} ->
write_message(Guid, Msg, State1);
- #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State1),
write_message(Guid, Msg, State1);
- [#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State1),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, TotalSize}]),
- noreply(State1 #msstate {
- sum_valid_data = SumValid + TotalSize })
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently,
+ %% we'll have to write a new copy, which will then
+ %% be younger, so ignore this write.
+ noreply(State1);
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State))
end;
- #msg_location { ref_count = RefCount, file = File } ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
ok = index_update_ref_count(Guid, RefCount + 1, State1),
CTG2 = case {dict:find(CRef, CODC), File} of
{{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
+ written),
CTG;
_ -> CTG1
end,
- noreply(State #msstate { cref_to_guids = CTG2 })
+ noreply(State1 #msstate { cref_to_guids = CTG2 })
end;
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, State2) end,
+ fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
- noreply(maybe_compact(State2));
+ noreply(maybe_compact(
+ client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -875,7 +952,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [client_confirm(CRef, Guids, written, State1)
+ || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
@@ -990,34 +1068,43 @@ contains_message(Guid, From,
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } =
- index_lookup_positive_ref_count(Guid, State),
- %% only update field, otherwise bad interaction with concurrent GC
- Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, File, State);
- [#file_summary {}] ->
+remove_message(Guid, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg
+ %% whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }} when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec =
+ fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here
+ %% because there may be further writes in the mailbox
+ %% for the same msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, Guid, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(File, -TotalSize,
+ State))
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
ok = Dec(),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, -TotalSize}]),
- delete_file_if_empty(
- File, State #msstate {
- sum_valid_data = SumValid - TotalSize })
- end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
- State
+ State
+ end
end.
add_to_pending_gc_completion(
@@ -1039,8 +1126,8 @@ run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending_action({remove, Guid}, State) ->
- remove_message(Guid, State).
+run_pending_action({remove, Guid, CRef}, State) ->
+ remove_message(Guid, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1051,15 +1138,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+adjust_valid_total_size(File, Delta, State = #msstate {
+ sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts }) ->
+ [_] = ets:update_counter(FileSummaryEts, File,
+ [{#file_summary.valid_total_size, Delta}]),
+ State #msstate { sum_valid_data = SumValid + Delta }.
+
orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, ActionTaken,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, ActionTaken),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
@@ -1073,6 +1167,29 @@ client_confirm(CRef, Guids,
error -> State
end.
+%% Detect whether the Guid is older or younger than the client's death
+%% msg (if there is one). If the msg is older than the client death
+%% msg, and it has a 0 ref_count we must only alter the ref_count, not
+%% rewrite the msg - rewriting it would make it younger than the death
+%% msg and thus should be ignored. Note that this (correctly) returns
+%% false when testing to remove the death msg itself.
+should_mask_action(CRef, Guid,
+ State = #msstate { dying_clients = DyingClients }) ->
+ case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ {false, Location} ->
+ {false, Location};
+ {true, not_found} ->
+ {true, not_found};
+ {true, #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } = Location} ->
+ #msg_location { file = DeathFile, offset = DeathOffset } =
+ index_lookup(CRef, State),
+ {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
+ {true, _} -> true;
+ {false, 0} -> false_if_increment;
+ {false, _} -> false
+ end, Location}
+ end.
%%----------------------------------------------------------------------------
%% file helper functions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 89954b06ef..c6a083bb7d 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,7 +32,7 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([is_ssl/1, controlling_process/2, getstat/2,
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
async_recv/3, port_command/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -50,6 +50,9 @@
-type(socket() :: port() | #ssl_socket{}).
-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(ssl_info/1 :: (socket())
+ -> 'nossl' | ok_val_or_error(
+ {atom(), {atom(), atom(), atom()}})).
-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(getstat/2 ::
(socket(), [stat_option()])
@@ -77,6 +80,11 @@
is_ssl(Sock) -> ?IS_SSL(Sock).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_info(Sock#ssl_socket.ssl);
+ssl_info(_Sock) ->
+ nossl.
+
controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6adcd8b0ce..9bee84f4c0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e87ff87976..e9f34a0f8c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -65,6 +65,8 @@
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism,
+ ssl_protocol, ssl_key_exchange,
+ ssl_cipher, ssl_hash,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -768,17 +770,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- SendFun =
- fun() ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- catch rabbit_net:send(Sock, Frame)
- end,
-
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun =
- fun() ->
- Parent ! timeout
- end,
+ ReceiveFun = fun() -> Parent ! timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -905,6 +900,14 @@ i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(ssl, #v1{sock = Sock}) ->
rabbit_net:is_ssl(Sock);
+i(ssl_protocol, #v1{sock = Sock}) ->
+ ssl_info(fun ({P, _}) -> P end, Sock);
+i(ssl_key_exchange, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
+i(ssl_cipher, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
+i(ssl_hash, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -955,6 +958,13 @@ socket_info(Get, Select) ->
{error, _} -> ''
end.
+ssl_info(F, Sock) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Info} -> F(Info)
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8ceb441039..d913092cce 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1696,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
- {A, B} =
+ {A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
@@ -1705,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:write(Guid, Guid, MSCState),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
end, {Qi, []}, SeqIds),
+ %% do this just to force all of the publishes through to the msg_store:
+ true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
@@ -1888,7 +1890,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1990,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2000,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2034,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2064,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2081,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2112,7 +2114,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2174,3 +2176,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 565c61e7d0..18423dd7b0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, ActionTaken) ->
+ msgs_written_to_disk(Self, Guids, ActionTaken)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,23 +710,22 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
- ack(fun msg_store_release/3,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ a(reduce_memory_use(
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
+ State2;
+ ({IsPersistent, Guid, MsgProps}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
+ State3
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -812,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) ->
+ case {OnSync, gb_sets:is_empty(UC)} of
+ {?BLANK_SYNC, true} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1323,7 +1324,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ {PersistentSeqIds, GuidsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1342,9 +1343,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore, AllGuids},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1364,24 +1365,21 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {lists:reverse(AllGuids),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false,
- guid = Guid },
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
- {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
- [Guid | AllGuids]}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1393,6 +1391,11 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { unconfirmed = [] }) ->
+ State;
+confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1403,7 +1406,13 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet) ->
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+
+msgs_written_to_disk(QPid, GuidSet, removed) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,