diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-08-19 11:21:00 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-08-19 11:21:00 +0100 |
| commit | 4f47fdefbc9b0189522a7e6b96b8a5a84eaf5ae9 (patch) | |
| tree | 99c8b3d388a5236767dd4aa21926ad2dedb46280 | |
| parent | 9994b41afc949a19ada7afc2b3550ba04ad58a64 (diff) | |
| download | rabbitmq-server-git-4f47fdefbc9b0189522a7e6b96b8a5a84eaf5ae9.tar.gz | |
Expiry is reset on tx commit now. TX publish messages have correct TTL semantics
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
2 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index abd09a6343..2810a15622 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -450,6 +450,19 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. + add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -565,6 +578,12 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. + reset_msg_expiry_fun(State) -> fun(MsgProps) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} @@ -621,24 +640,6 @@ emit_stats(State) -> [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). %--------------------------------------------------------------------------- -fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}}; - {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} - end - end. - -msg_expired(MsgProperties = #msg_properties{expiry = undefined}) -> - false; -msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) -> - Now = timer:now_diff(now(), {0,0,0}), - Now > Expiry. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 591052a601..4e978fd5ad 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -612,7 +612,11 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), + F = fun({Msg, MsgProperties}) -> + {Msg, MsgPropsFun(MsgProperties)} + end, + PubsProcessed = lists:map(F, Pubs), + PubsOrdered = lists:reverse(PubsProcessed), AckTags1 = lists:append(AckTags), PersistentGuids = persistent_guids(PubsOrdered), HasPersistentPubs = PersistentGuids =/= [], @@ -956,8 +960,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgProperties1 = MsgProperties, - {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), |
