diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
5 files changed, 42 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 014c36bc53..b314ddef1a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -223,15 +223,14 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end end, State, [{<<"x-expires">>, fun init_expires/2}, {<<"x-message-ttl">>, fun init_ttl/2}, - {<<"x-dead-letter-exchange">>, - fun init_dlx/2}]). + {<<"x-dead-letter-exchange">>, fun init_dlx/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{ - virtual_host = VHostPath}}}) -> + virtual_host = VHostPath}}}) -> State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}. terminate_shutdown(Fun, State) -> @@ -691,7 +690,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_callback_fun(expired, State), + mk_dead_letter_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -708,11 +707,11 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -dead_letter_callback_fun(_Reason, #q{dlx = undefined}) -> - fun(_MsgFun, BQS) -> BQS end; -dead_letter_callback_fun(Reason, State) -> - fun(MsgFun, BQS) -> - {Msg, BQS1} = MsgFun(BQS), +mk_dead_letter_fun(_Reason, #q{dlx = undefined}) -> + fun(_MsgLookupFun, BQS) -> BQS end; +mk_dead_letter_fun(Reason, State) -> + fun(MsgLookupFun, BQS) -> + {Msg, BQS1} = MsgLookupFun(BQS), dead_letter_msg(Msg, Reason, State), BQS1 end. @@ -731,7 +730,7 @@ maybe_dead_letter_queue(Reason, State = #q{ end. dead_letter_msg(Msg, Reason, State = #q{dlx = DLX}) -> - Exchange = rabbit_exchange:lookup_or_die(DLX), + rabbit_exchange:lookup_or_die(DLX), rabbit_basic:publish( rabbit_basic:delivery( @@ -1110,8 +1109,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); - false -> Fun = dead_letter_callback_fun(rejected, - State), + false -> Fun = mk_dead_letter_fun(rejected, State), {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9f12b954bd..7b844b2036 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -172,12 +172,12 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, DropFun, State = #state{gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +dropwhile(Pred, MsgFun, State = #state{gm = GM, + backing_queue = BQ, + set_delivered = SetDelivered, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Pred, DropFun, BQS), + BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), @@ -235,15 +235,15 @@ fetch(AckRequired, State = #state { gm = GM, ack_msg_id = AM1 }} end. -ack(AckTags, Fun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - {MsgIds, BQS1} = BQ:ack(AckTags, Fun, BQS), +ack(AckTags, MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS), AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, Fun, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds}) end, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 33d7da58fb..7238b16968 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -818,12 +818,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, %% we must be shorter than the master State end}; -process_instruction({ack, Fun, MsgIds}, +process_instruction({ack, MsgFun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {MsgIds1, BQS1} = BQ:ack(AckTags, Fun, BQS), + {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2d55d133a4..6e5e86c6a7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2335,7 +2335,7 @@ test_dropwhile(VQ0) -> fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 end, - dummy_msg_lookup_fun(), + dummy_msg_fun(), VQ1), %% fetch five now @@ -2350,18 +2350,18 @@ test_dropwhile(VQ0) -> VQ4. -dummy_msg_lookup_fun() -> +dummy_msg_fun() -> fun(_Fun, State) -> State end. test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_lookup_fun(), VQ2), + fun(_) -> false end, dummy_msg_fun(), VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_lookup_fun(), VQ5). + fun(_) -> false end, dummy_msg_fun(), VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2386,8 +2386,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, - dummy_msg_lookup_fun(), VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, dummy_msg_fun(), VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2397,8 +2396,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], - dummy_msg_lookup_fun(), VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], dummy_msg_fun(), VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2433,7 +2431,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, - dummy_msg_lookup_fun(), VQ8), + dummy_msg_fun(), VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bf9450f1b1..3e34684863 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -581,15 +581,17 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, DropFun, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> State2 = DropFun(read_msg_callback(MsgStatus), State1), - dropwhile(Pred, DropFun, State2); - false -> a(in_r(MsgStatus, State1)) + true -> + State2 = MsgFun(read_msg_callback(MsgStatus), State1), + dropwhile(Pred, MsgFun, State2); + false -> + a(in_r(MsgStatus, State1)) end end. @@ -609,17 +611,13 @@ fetch(AckRequired, State) -> read_msg_callback(#msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }) -> - fun(State) -> - read_msg_callback1(MsgId, IsPersistent, State) - end; -read_msg_callback(#msg_status{ msg = Msg}) -> - fun(State) -> - {Msg, State} - end; + fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end; + +read_msg_callback(#msg_status{ msg = Msg }) -> + fun(State) -> {Msg, State} end; + read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> - fun(State) -> - read_msg_callback1(MsgId, IsPersistent, State) - end. + fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end. read_msg_callback1(MsgId, IsPersistent, State = #vqstate{ msg_store_clients = MSCState }) -> |
