summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_variable_queue.erl36
3 files changed, 30 insertions, 22 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 0044174c49..fa90eef4b9 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -28,12 +28,12 @@
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
--type(msg_lookup_result() :: {rabbit_types:basic_message(), {any(), state()}}).
+-type(msg_lookup_result() :: {rabbit_types:basic_message(), state()}).
--type(msg_lookup_fun() :: fun((any(), state()) -> msg_lookup_result())).
+-type(msg_lookup_fun() :: fun((state()) -> msg_lookup_result())).
-type(msg_lookup_callback() ::
- fun((msg_lookup_fun(), {A, state()}) -> {A, state()})).
+ fun((msg_lookup_fun(), state()) -> state())).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index db812b80e4..143a043de3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -191,7 +191,7 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
init_dlx(DLE, State = #q{q = #amqqueue{name = #resource{
- virtual_host = VHostPath}}}) ->
+ virtual_host = VHostPath}}}) ->
State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLE)}.
terminate_shutdown(Fun, State) ->
@@ -745,12 +745,12 @@ ensure_ttl_timer(State) ->
State.
dead_letter_callback_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgFun, LookupState) -> LookupState end;
+ fun(_MsgFun, BQS) -> BQS end;
dead_letter_callback_fun(Reason, State) ->
- fun(MsgFun, LookupState) ->
- {Msg, LookupState1} = MsgFun(LookupState),
+ fun(MsgFun, BQS) ->
+ {Msg, BQS1} = MsgFun(BQS),
dead_letter_msg(Msg, Reason, State),
- LookupState1
+ BQS1
end.
maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bc5941c54b..75be6a8218 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -568,10 +568,8 @@ dropwhile1(Pred, DropFun, State) ->
fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
true ->
- {MsgStatus1, State2} =
- DropFun(fun read_msg_callback/1, {MsgStatus, State1}),
-
- {_, State3} = internal_fetch(false, MsgStatus1, State2),
+ State2 = DropFun(read_msg_callback(MsgStatus), State1),
+ {_, State3} = internal_fetch(false, MsgStatus, State2),
dropwhile1(Pred, DropFun, State3);
false ->
{ok, in_r(MsgStatus, State1)}
@@ -608,16 +606,26 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
-read_msg_callback({MsgStatus = #msg_status {}, State}) ->
- {MsgStatus1 = #msg_status { msg = Msg }, State1} =
- read_msg(MsgStatus, State),
- {Msg, {MsgStatus1, State1}};
-read_msg_callback({{IsPersistent, MsgId, _MsgProps}, State}) ->
- #vqstate { msg_store_clients = MSCState } = State,
+read_msg_callback(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent }) ->
+ fun(State) ->
+ read_msg_callback1(MsgId, IsPersistent, State)
+ end;
+read_msg_callback(#msg_status{ msg = Msg}) ->
+ fun(State) ->
+ {Msg, State}
+ end;
+read_msg_callback({IsPersistent, MsgId, _MsgProps}) ->
+ fun(State) ->
+ read_msg_callback1(MsgId, IsPersistent, State)
+ end.
+
+read_msg_callback1(MsgId, IsPersistent,
+ State = #vqstate{ msg_store_clients = MSCState }) ->
{{ok, Msg = #basic_message{}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, {undefined, State #vqstate {
- msg_store_clients = MSCState1 }}}.
+ {Msg, State #vqstate { msg_store_clients = MSCState1 }}.
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
@@ -687,8 +695,8 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ack(AckTags, Fun, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
fun (AckEntry, State0) ->
- {_, State2} = Fun(fun read_msg_callback/1,
- {AckEntry, State0}),
+ State2 = Fun(read_msg_callback(AckEntry),
+ State0),
State2
end,
AckTags, State),