diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 |
1 files changed, 13 insertions, 15 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 64285be9aa..a4a2d655cb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -615,19 +615,13 @@ fetch(AckRequired, State) -> read_msg_callback(#msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }) -> - fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end; + fun(State) -> read_msg_common(MsgId, IsPersistent, State) end; read_msg_callback(#msg_status{ msg = Msg }) -> fun(State) -> {Msg, State} end; read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> - fun(State) -> read_msg_callback1(MsgId, IsPersistent, State) end. - -read_msg_callback1(MsgId, IsPersistent, - State = #vqstate{ msg_store_clients = MSCState }) -> - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate { msg_store_clients = MSCState1 }}. + fun(State) -> read_msg_common(MsgId, IsPersistent, State) end. ack([], _Fun, State) -> {[], State}; @@ -1068,16 +1062,20 @@ queue_out(State = #vqstate { q4 = Q4 }) -> read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, - msg_store_clients = MSCState1 }}; + State) -> + {Msg, State1} = read_msg_common(MsgId, IsPersistent, State), + {MsgStatus #msg_status { msg = Msg }, State1}; read_msg(MsgStatus, State) -> {MsgStatus, State}. +read_msg_common(MsgId, IsPersistent, + State = #vqstate{ ram_msg_count = RamMsgCount, + msg_store_clients = MSCState }) -> + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate { ram_msg_count = RamMsgCount + 1, + msg_store_clients = MSCState1 }}. + internal_fetch(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, |
