summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-08-19 11:21:00 +0100
committerRob Harrop <rharrop@vmware.com>2010-08-19 11:21:00 +0100
commit4f47fdefbc9b0189522a7e6b96b8a5a84eaf5ae9 (patch)
tree99c8b3d388a5236767dd4aa21926ad2dedb46280
parent9994b41afc949a19ada7afc2b3550ba04ad58a64 (diff)
downloadrabbitmq-server-git-4f47fdefbc9b0189522a7e6b96b8a5a84eaf5ae9.tar.gz
Expiry is reset on tx commit now. TX publish messages have correct TTL semantics
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_variable_queue.erl9
2 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index abd09a6343..2810a15622 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -450,6 +450,19 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State).
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case msg_expired(MsgProperties) of
+ true ->
+ fetch(AckRequired, State#q{backing_queue_state = BQS1});
+ false ->
+ {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ end
+ end.
+
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
@@ -565,6 +578,12 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) ->
+ false;
+msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now > Expiry.
+
reset_msg_expiry_fun(State) ->
fun(MsgProps) ->
MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
@@ -621,24 +640,6 @@ emit_stats(State) ->
[{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
%---------------------------------------------------------------------------
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}};
- {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
- case msg_expired(MsgProperties) of
- true ->
- fetch(AckRequired, State#q{backing_queue_state = BQS1});
- false ->
- {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
- end
- end.
-
-msg_expired(MsgProperties = #msg_properties{expiry = undefined}) ->
- false;
-msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) ->
- Now = timer:now_diff(now(), {0,0,0}),
- Now > Expiry.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 591052a601..4e978fd5ad 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -612,7 +612,11 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:reverse(Pubs),
+ F = fun({Msg, MsgProperties}) ->
+ {Msg, MsgPropsFun(MsgProperties)}
+ end,
+ PubsProcessed = lists:map(F, Pubs),
+ PubsOrdered = lists:reverse(PubsProcessed),
AckTags1 = lists:append(AckTags),
PersistentGuids = persistent_guids(PubsOrdered),
HasPersistentPubs = PersistentGuids =/= [],
@@ -956,8 +960,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},
{SeqIdsAcc, State2}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgProperties1 = MsgProperties,
- {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2),
+ {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),