diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
2 files changed, 7 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b8b0cf8d1b..482d2a3769 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -399,7 +399,7 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_messages_internal(Guids, State) -> +confirm_messages_internal(Guids, State) when is_list(Guids) -> lists:foldl(fun(Guid, State0) -> confirm_message_internal(Guid, State0) end, State, Guids). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index efdf34f236..e3a0898992 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -768,8 +768,11 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> lists:foldl(fun(SeqId, Guids) -> - {ok, #msg_status { msg = Msg }} = dict:find(SeqId, PA), - [Msg#basic_message.guid | Guids] + {ok, AckEntry} = dict:find(SeqId, PA), + [case AckEntry of + #msg_status { msg = Msg } -> Msg#basic_message.guid; + {_, Guid} -> Guid + end | Guids] end, [], SeqIds). %%---------------------------------------------------------------------------- @@ -984,7 +987,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, element(1, ack(Acks, State))}, Pubs), + end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( |
