summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl22
-rw-r--r--src/rabbit_mirror_queue_master.erl22
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl26
5 files changed, 42 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 014c36bc53..b314ddef1a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -223,15 +223,14 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end
end, State, [{<<"x-expires">>, fun init_expires/2},
{<<"x-message-ttl">>, fun init_ttl/2},
- {<<"x-dead-letter-exchange">>,
- fun init_dlx/2}]).
+ {<<"x-dead-letter-exchange">>, fun init_dlx/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{
- virtual_host = VHostPath}}}) ->
+ virtual_host = VHostPath}}}) ->
State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}.
terminate_shutdown(Fun, State) ->
@@ -691,7 +690,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_callback_fun(expired, State),
+ mk_dead_letter_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -708,11 +707,11 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-dead_letter_callback_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgFun, BQS) -> BQS end;
-dead_letter_callback_fun(Reason, State) ->
- fun(MsgFun, BQS) ->
- {Msg, BQS1} = MsgFun(BQS),
+mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
+ fun(_MsgLookupFun, BQS) -> BQS end;
+mk_dead_letter_fun(Reason, State) ->
+ fun(MsgLookupFun, BQS) ->
+ {Msg, BQS1} = MsgLookupFun(BQS),
dead_letter_msg(Msg, Reason, State),
BQS1
end.
@@ -731,7 +730,7 @@ maybe_dead_letter_queue(Reason, State = #q{
end.
dead_letter_msg(Msg, Reason, State = #q{dlx = DLX}) ->
- Exchange = rabbit_exchange:lookup_or_die(DLX),
+ rabbit_exchange:lookup_or_die(DLX),
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -1110,8 +1109,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
backing_queue_state = BQS}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
- false -> Fun = dead_letter_callback_fun(rejected,
- State),
+ false -> Fun = mk_dead_letter_fun(rejected, State),
{_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9f12b954bd..7b844b2036 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -172,12 +172,12 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, DropFun, State = #state{gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+dropwhile(Pred, MsgFun, State = #state{gm = GM,
+ backing_queue = BQ,
+ set_delivered = SetDelivered,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, DropFun, BQS),
+ BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
@@ -235,15 +235,15 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, Fun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, BQS1} = BQ:ack(AckTags, Fun, BQS),
+ack(AckTags, MsgFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, Fun, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds})
end,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 33d7da58fb..7238b16968 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -818,12 +818,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
%% we must be shorter than the master
State
end};
-process_instruction({ack, Fun, MsgIds},
+process_instruction({ack, MsgFun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {MsgIds1, BQS1} = BQ:ack(AckTags, Fun, BQS),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2d55d133a4..6e5e86c6a7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2335,7 +2335,7 @@ test_dropwhile(VQ0) ->
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
end,
- dummy_msg_lookup_fun(),
+ dummy_msg_fun(),
VQ1),
%% fetch five now
@@ -2350,18 +2350,18 @@ test_dropwhile(VQ0) ->
VQ4.
-dummy_msg_lookup_fun() ->
+dummy_msg_fun() ->
fun(_Fun, State) -> State end.
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_lookup_fun(), VQ2),
+ fun(_) -> false end, dummy_msg_fun(), VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_lookup_fun(), VQ5).
+ fun(_) -> false end, dummy_msg_fun(), VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2386,8 +2386,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags,
- dummy_msg_lookup_fun(), VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, dummy_msg_fun(), VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2397,8 +2396,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag],
- dummy_msg_lookup_fun(), VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], dummy_msg_fun(), VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2433,7 +2431,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
{_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
- dummy_msg_lookup_fun(), VQ8),
+ dummy_msg_fun(), VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bf9450f1b1..3e34684863 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -581,15 +581,17 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, DropFun, State) ->
+dropwhile(Pred, MsgFun, State) ->
case queue_out(State) of
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> State2 = DropFun(read_msg_callback(MsgStatus), State1),
- dropwhile(Pred, DropFun, State2);
- false -> a(in_r(MsgStatus, State1))
+ true ->
+ State2 = MsgFun(read_msg_callback(MsgStatus), State1),
+ dropwhile(Pred, MsgFun, State2);
+ false ->
+ a(in_r(MsgStatus, State1))
end
end.
@@ -609,17 +611,13 @@ fetch(AckRequired, State) ->
read_msg_callback(#msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent }) ->
- fun(State) ->
- read_msg_callback1(MsgId, IsPersistent, State)
- end;
-read_msg_callback(#msg_status{ msg = Msg}) ->
- fun(State) ->
- {Msg, State}
- end;
+ fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end;
+
+read_msg_callback(#msg_status{ msg = Msg }) ->
+ fun(State) -> {Msg, State} end;
+
read_msg_callback({IsPersistent, MsgId, _MsgProps}) ->
- fun(State) ->
- read_msg_callback1(MsgId, IsPersistent, State)
- end.
+ fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end.
read_msg_callback1(MsgId, IsPersistent,
State = #vqstate{ msg_store_clients = MSCState }) ->