diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 36 |
3 files changed, 30 insertions, 22 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 0044174c49..fa90eef4b9 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -28,12 +28,12 @@ -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). --type(msg_lookup_result() :: {rabbit_types:basic_message(), {any(), state()}}). +-type(msg_lookup_result() :: {rabbit_types:basic_message(), state()}). --type(msg_lookup_fun() :: fun((any(), state()) -> msg_lookup_result())). +-type(msg_lookup_fun() :: fun((state()) -> msg_lookup_result())). -type(msg_lookup_callback() :: - fun((msg_lookup_fun(), {A, state()}) -> {A, state()})). + fun((msg_lookup_fun(), state()) -> state())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index db812b80e4..143a043de3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -191,7 +191,7 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). init_dlx(DLE, State = #q{q = #amqqueue{name = #resource{ - virtual_host = VHostPath}}}) -> + virtual_host = VHostPath}}}) -> State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLE)}. terminate_shutdown(Fun, State) -> @@ -745,12 +745,12 @@ ensure_ttl_timer(State) -> State. dead_letter_callback_fun(_Reason, #q{dlx = undefined}) -> - fun(_MsgFun, LookupState) -> LookupState end; + fun(_MsgFun, BQS) -> BQS end; dead_letter_callback_fun(Reason, State) -> - fun(MsgFun, LookupState) -> - {Msg, LookupState1} = MsgFun(LookupState), + fun(MsgFun, BQS) -> + {Msg, BQS1} = MsgFun(BQS), dead_letter_msg(Msg, Reason, State), - LookupState1 + BQS1 end. maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bc5941c54b..75be6a8218 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -568,10 +568,8 @@ dropwhile1(Pred, DropFun, State) -> fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of true -> - {MsgStatus1, State2} = - DropFun(fun read_msg_callback/1, {MsgStatus, State1}), - - {_, State3} = internal_fetch(false, MsgStatus1, State2), + State2 = DropFun(read_msg_callback(MsgStatus), State1), + {_, State3} = internal_fetch(false, MsgStatus, State2), dropwhile1(Pred, DropFun, State3); false -> {ok, in_r(MsgStatus, State1)} @@ -608,16 +606,26 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -read_msg_callback({MsgStatus = #msg_status {}, State}) -> - {MsgStatus1 = #msg_status { msg = Msg }, State1} = - read_msg(MsgStatus, State), - {Msg, {MsgStatus1, State1}}; -read_msg_callback({{IsPersistent, MsgId, _MsgProps}, State}) -> - #vqstate { msg_store_clients = MSCState } = State, +read_msg_callback(#msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }) -> + fun(State) -> + read_msg_callback1(MsgId, IsPersistent, State) + end; +read_msg_callback(#msg_status{ msg = Msg}) -> + fun(State) -> + {Msg, State} + end; +read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> + fun(State) -> + read_msg_callback1(MsgId, IsPersistent, State) + end. + +read_msg_callback1(MsgId, IsPersistent, + State = #vqstate{ msg_store_clients = MSCState }) -> {{ok, Msg = #basic_message{}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, {undefined, State #vqstate { - msg_store_clients = MSCState1 }}}. + {Msg, State #vqstate { msg_store_clients = MSCState1 }}. read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, @@ -687,8 +695,8 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { ack(AckTags, Fun, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (AckEntry, State0) -> - {_, State2} = Fun(fun read_msg_callback/1, - {AckEntry, State0}), + State2 = Fun(read_msg_callback(AckEntry), + State0), State2 end, AckTags, State), |
