diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 111 |
1 files changed, 68 insertions, 43 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3ce889fba5..dd92256146 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -653,25 +653,28 @@ ack([], State) -> %% optimisation: this head is essentially a partial evaluation of the %% general case below, for the single-ack case. ack([SeqId], State) -> - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - remove_pending_ack(true, SeqId, State), - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })}; + case remove_pending_ack(true, SeqId, State) of + {none, _} -> + State; + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_in_store = MsgInStore, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + ack_out_counter = AckOutCount }} -> + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgInStore of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + ack_out_counter = AckOutCount + 1 })} + end; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -679,8 +682,12 @@ ack(AckTags, State) -> ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} + case remove_pending_ack(true, SeqId, State2) of + {none, _} -> + {Acc, State2}; + {MsgStatus, State3} -> + {accumulate_ack(MsgStatus, Acc), State3} + end end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), remove_msgs_by_id(MsgIdsByStore, MSCState), @@ -1998,8 +2005,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, %% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {MsgStatus, State1} -> + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)} + end; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA}) -> @@ -2011,9 +2022,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, DPA1 = gb_trees:delete(SeqId, DPA), {V, State#vqstate{disk_pending_ack = DPA1}}; none -> - QPA1 = gb_trees:delete(SeqId, QPA), - {gb_trees:get(SeqId, QPA), - State#vqstate{qi_pending_ack = QPA1}} + case gb_trees:lookup(SeqId, QPA) of + {value, V} -> + QPA1 = gb_trees:delete(SeqId, QPA), + {V, State#vqstate{qi_pending_ack = QPA1}}; + none -> + {none, State} + end end end. @@ -2164,11 +2179,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + case msg_from_pending_ack(SeqId, State) of + {none, _} -> + queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State); + {MsgStatus, State1} -> + {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, State2) + end end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -2177,22 +2196,28 @@ queue_merge(SeqIds, Q, Front, MsgIds, delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, State0), - {_MsgStatus, State2} = - maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} + lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) -> + case msg_from_pending_ack(SeqId, State0) of + {none, _} -> + Acc; + {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> + {_MsgStatus, State2} = + maybe_prepare_write_to_disk(true, true, MsgStatus, State1), + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + stats({1, -1}, {MsgStatus, none}, State2)} + end end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(false, SeqId, State), - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1}. + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {#msg_status { msg_props = MsgProps } = MsgStatus, State1} -> + {MsgStatus #msg_status { + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1} + end. beta_limit(Q) -> case ?QUEUE:peek(Q) of |
