summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-17 13:42:41 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-17 13:42:41 +0100
commit453b60dc6a45ef50209f5831f70bbd2a330ee88f (patch)
tree045e41fb16f364b6cf331ecd68cc0974f8c7e3c3
parentc8c51ed380ab92e3bac4cf38cfccc8958d98cada (diff)
downloadrabbitmq-server-git-453b60dc6a45ef50209f5831f70bbd2a330ee88f.tar.gz
invariable queue updated for backing queue api
-rw-r--r--src/rabbit_invariable_queue.erl47
1 files changed, 30 insertions, 17 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 44489bf939..3eea7becce 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -32,8 +32,8 @@
-module(rabbit_invariable_queue).
-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
- tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+ publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+ tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -92,10 +92,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered},
Acc) ->
Acc;
- ({Msg = #basic_message { guid = Guid }, _MsgProps, IsDelivered},
+ ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
{AckTagsN, PAN}) ->
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)}
end, {[], dict:new()}, Q),
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
@@ -105,17 +105,18 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q,
durable = IsDurable,
len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
- QueueItem = {Msg, MsgProps, false},
- State #iv_state { queue = queue:in(QueueItem, Q), len = Len + 1 }.
+ Q1 = enqueue(Msg, MsgProps, false, Q),
+ State #iv_state { queue = Q1, len = Len + 1 }.
-publish_delivered(false, _Msg, State) ->
+publish_delivered(false, _Msg, _MsgProps, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
+ MsgProps,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg),
ok = persist_delivery(QName, IsDurable, false, Msg),
- {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+ {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
fetch(_AckRequired, State = #iv_state { len = 0 }) ->
{empty, State};
@@ -126,7 +127,7 @@ fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
Q1} = queue:out(Q),
Len1 = Len - 1,
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- PA1 = dict:store(Guid, Msg, PA),
+ PA1 = store_ack(Msg, MsgProps, PA),
{AckTag, PA2} = case AckRequired of
true -> {Guid, PA1};
false -> ok = persist_acks(QName, IsDurable, none,
@@ -142,10 +143,10 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
ok = persist_message(QName, IsDurable, Txn, Msg),
State.
@@ -163,7 +164,8 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) ->
erase_tx(Txn),
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
+tx_commit(Txn, Fun, MsgPropsFun,
+ State = #iv_state { qname = QName, pending_ack = PA,
queue = Q, len = Len }) ->
#tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
@@ -172,12 +174,14 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
Fun(),
AckTags1 = lists:flatten(AckTags),
PA1 = remove_acks(AckTags1, PA),
- {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
- {queue:in({Msg, false}, QN), LenN + 1}
+ {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
+ MsgProps1 = MsgPropsFun(MsgProps),
+ QN = enqueue(Msg, MsgProps1, false, Q),
+ {QN, LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
+requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
len = Len }) ->
%% We don't need to touch the persister here - the persister will
%% already have these messages published and delivered as
@@ -190,12 +194,18 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
%% order to the last known state of our queue, prior to shutdown.
{Q1, Len1} = lists:foldl(
fun (Guid, {QN, LenN}) ->
- {ok, Msg = #basic_message {}} = dict:find(Guid, PA),
- {queue:in({Msg, true}, QN), LenN + 1}
+ {ok, {Msg = #basic_message {}, MsgProps}}
+ = dict:find(Guid, PA),
+ MsgProps1 = MsgPropsFun(MsgProps),
+ {enqueue(Msg, MsgProps1, true, QN), LenN + 1}
end, {Q, Len}, AckTags),
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+enqueue(Msg, MsgProps, IsDelivered, Q) ->
+ I = {Msg, MsgProps, IsDelivered},
+ queue:in(I, Q).
+
len(#iv_state { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -216,6 +226,9 @@ status(_State) -> [].
remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) ->
+ dict:store(Guid, {Msg, MsgProps}, PA).
+
%%----------------------------------------------------------------------------
lookup_tx(Txn) ->