summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-13 15:15:13 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-13 15:15:13 +0100
commitf08c39dc0b10a312644a84a96457d19e95b2e225 (patch)
treed1108be377dc73d08886975c2f7f35d125cab0c5 /src
parent86189f50af06b7a92ef451fb9f49bdbc25550fe8 (diff)
downloadrabbitmq-server-git-f08c39dc0b10a312644a84a96457d19e95b2e225.tar.gz
publish/3 -> publish/2 because the IsDelivered bit is always false when called externally. Also rework requeue, because with the ability to indicate that persistent msgs will already be in msg_store, we don't need to call msg_store:write for persistent msgs, which means that we can also avoid the call to msg_store:remove that would have happened in the call to ack
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl46
1 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 2b02466979..0ffc2adcc3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/1, publish/3, set_queue_ram_duration_target/2,
+-export([init/1, publish/2, set_queue_ram_duration_target/2,
remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
maybe_start_prefetcher/1, purge/1, delete/1, requeue/2,
tx_publish/2, tx_rollback/2, tx_commit/3, do_tx_commit/3]).
@@ -140,14 +140,8 @@ init(QueueName) ->
},
maybe_load_next_segment(State).
-publish(Msg, IsDelivered, State) ->
- publish(Msg, IsDelivered, false, State).
-
-publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { next_seq_id = SeqId, len = Len }) ->
- {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
- PersistentMsgsAlreadyOnDisk,
- State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}.
+publish(Msg, State) ->
+ publish(Msg, false, false, State).
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
@@ -293,14 +287,32 @@ delete(State) ->
end.
%% [{Msg, AckTag}]
+%% We guarantee that after fetch, only persistent msgs are left on
+%% disk. This means that in a requeue, we set
+%% PersistentMsgsAlreadyOnDisk to true, thus avoiding calls to
+%% msg_store:write for persistent msgs. It also means that we don't
+%% need to worry about calling msg_store:remove (as ack would do)
+%% because transient msgs won't be on disk anyway, thus they won't
+%% need to be removed.
requeue(MsgsWithAckTags, State) ->
- {AckTags, State1} =
+ {SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
- fun ({Msg, AckTag}, {AckTagsAcc, StateN}) ->
- StateN1 = publish(Msg, true, StateN),
- {[AckTag | AckTagsAcc], StateN1}
+ fun ({Msg = #basic_message { guid = MsgId }, AckTag},
+ {SeqIdsAcc, StateN}) ->
+ {_SeqId, StateN1} = publish(Msg, true, true, StateN),
+ SeqIdsAcc1 = case AckTag of
+ ack_not_on_disk ->
+ SeqIdsAcc;
+ {ack_index_and_store, MsgId, SeqId} ->
+ [SeqId | SeqIdsAcc]
+ end,
+ {SeqIdsAcc1, StateN1}
end, {[], State}, MsgsWithAckTags),
- ack(AckTags, State1).
+ IndexState1 = case SeqIds of
+ [] -> IndexState;
+ _ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
+ end,
+ State1 #vqstate { index_state = IndexState1 }.
tx_publish(Msg = #basic_message { is_persistent = true }, State) ->
true = maybe_write_msg_to_disk(true, false, Msg),
@@ -414,6 +426,12 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
index_on_disk = IndexOnDisk }) ->
{MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
+publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk,
+ State = #vqstate { next_seq_id = SeqId, len = Len }) ->
+ {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
+ PersistentMsgsAlreadyOnDisk,
+ State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}.
+
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk,