diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-23 13:02:58 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-23 13:02:58 +0100 |
| commit | 8160a85527a357144234a39f49d9d7d485672430 (patch) | |
| tree | 41cafabeab0918ded9057536f89981d0669ab6cc /src | |
| parent | 42d17886308efbc8286d685efbe54d0fdea560f3 (diff) | |
| download | rabbitmq-server-git-8160a85527a357144234a39f49d9d7d485672430.tar.gz | |
invariable queue has dropwhile implementation now
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4626b513e7..8eb6ebbd73 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -118,18 +118,31 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. -dropwhile(Pred, State) -> - State. +dropwhile(_Pred, State = #iv_state { len = 0 }) -> + State; +dropwhile(Pred, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + case Pred(Msg, MsgProps) of + true -> + {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), + dropwhile(Pred, State1); + false -> + State + end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, - queue = Q, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, - Q1} = queue:out(Q), +fetch(AckRequired, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). + +fetch_internal(AckRequired, Q1, + Msg = #basic_message {guid = Guid}, + MsgProps, IsDelivered, + State = #iv_state { len = Len, + qname = QName, + durable = IsDurable, + pending_ack = PA }) -> Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = store_ack(Msg, MsgProps, PA), @@ -139,7 +152,7 @@ fetch(AckRequired, State = #iv_state { len = Len, [Guid], PA1), {blank_ack, PA} end, - {{Msg, MsgProps, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, |
