diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-13 15:15:13 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-13 15:15:13 +0100 |
| commit | f08c39dc0b10a312644a84a96457d19e95b2e225 (patch) | |
| tree | d1108be377dc73d08886975c2f7f35d125cab0c5 /src | |
| parent | 86189f50af06b7a92ef451fb9f49bdbc25550fe8 (diff) | |
| download | rabbitmq-server-git-f08c39dc0b10a312644a84a96457d19e95b2e225.tar.gz | |
publish/3 -> publish/2 because the IsDelivered bit is always false when called externally. Also rework requeue, because with the ability to indicate that persistent msgs will already be in msg_store, we don't need to call msg_store:write for persistent msgs, which means that we can also avoid the call to msg_store:remove that would have happened in the call to ack
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2b02466979..0ffc2adcc3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/1, publish/3, set_queue_ram_duration_target/2, +-export([init/1, publish/2, set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/3, do_tx_commit/3]). @@ -140,14 +140,8 @@ init(QueueName) -> }, maybe_load_next_segment(State). -publish(Msg, IsDelivered, State) -> - publish(Msg, IsDelivered, false, State). - -publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { next_seq_id = SeqId, len = Len }) -> - {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, - PersistentMsgsAlreadyOnDisk, - State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}. +publish(Msg, State) -> + publish(Msg, false, false, State). set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, @@ -293,14 +287,32 @@ delete(State) -> end. %% [{Msg, AckTag}] +%% We guarantee that after fetch, only persistent msgs are left on +%% disk. This means that in a requeue, we set +%% PersistentMsgsAlreadyOnDisk to true, thus avoiding calls to +%% msg_store:write for persistent msgs. It also means that we don't +%% need to worry about calling msg_store:remove (as ack would do) +%% because transient msgs won't be on disk anyway, thus they won't +%% need to be removed. requeue(MsgsWithAckTags, State) -> - {AckTags, State1} = + {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun ({Msg, AckTag}, {AckTagsAcc, StateN}) -> - StateN1 = publish(Msg, true, StateN), - {[AckTag | AckTagsAcc], StateN1} + fun ({Msg = #basic_message { guid = MsgId }, AckTag}, + {SeqIdsAcc, StateN}) -> + {_SeqId, StateN1} = publish(Msg, true, true, StateN), + SeqIdsAcc1 = case AckTag of + ack_not_on_disk -> + SeqIdsAcc; + {ack_index_and_store, MsgId, SeqId} -> + [SeqId | SeqIdsAcc] + end, + {SeqIdsAcc1, StateN1} end, {[], State}, MsgsWithAckTags), - ack(AckTags, State1). + IndexState1 = case SeqIds of + [] -> IndexState; + _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) + end, + State1 #vqstate { index_state = IndexState1 }. tx_publish(Msg = #basic_message { is_persistent = true }, State) -> true = maybe_write_msg_to_disk(true, false, Msg), @@ -414,6 +426,12 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, index_on_disk = IndexOnDisk }) -> {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. +publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk, + State = #vqstate { next_seq_id = SeqId, len = Len }) -> + {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, + PersistentMsgsAlreadyOnDisk, + State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}. + publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, |
