diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
1 files changed, 12 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cf82c639cc..bc5941c54b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -569,7 +569,7 @@ dropwhile1(Pred, DropFun, State) -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = - DropFun(read_msg_callback(), {MsgStatus, State1}), + DropFun(fun read_msg_callback/1, {MsgStatus, State1}), {_, State3} = internal_fetch(false, MsgStatus1, State2), dropwhile1(Pred, DropFun, State3); @@ -608,18 +608,16 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -read_msg_callback() -> - fun({MsgStatus = #msg_status {}, State}) -> - {MsgStatus1 = #msg_status { msg = Msg }, State1} = - read_msg(MsgStatus, State), - {Msg, {MsgStatus1, State1}}; - ({{IsPersistent, MsgId, _MsgProps}, State}) -> - #vqstate { msg_store_clients = MSCState } = State, - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, {undefined, State #vqstate { - msg_store_clients = MSCState1 }}} - end. +read_msg_callback({MsgStatus = #msg_status {}, State}) -> + {MsgStatus1 = #msg_status { msg = Msg }, State1} = + read_msg(MsgStatus, State), + {Msg, {MsgStatus1, State1}}; +read_msg_callback({{IsPersistent, MsgId, _MsgProps}, State}) -> + #vqstate { msg_store_clients = MSCState } = State, + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, {undefined, State #vqstate { + msg_store_clients = MSCState1 }}}. read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, @@ -689,7 +687,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { ack(AckTags, Fun, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, fun (AckEntry, State0) -> - {_, State2} = Fun(read_msg_callback(), + {_, State2} = Fun(fun read_msg_callback/1, {AckEntry, State0}), State2 end, |
