summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-03 17:17:58 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-03 17:17:58 +0100
commita1a5b973055dbd48c1f5a6240df59c17fa7954d3 (patch)
tree9805c285d8e30cc46b688762356c9a994b207bba
parent954ba7ce3f586acfa8f61b328ee77f0def521240 (diff)
downloadrabbitmq-server-git-a1a5b973055dbd48c1f5a6240df59c17fa7954d3.tar.gz
stopped a race
-rw-r--r--src/rabbit_variable_queue.erl57
1 files changed, 37 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c33859ee59..9e7b1d2c7b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -238,7 +238,8 @@
in_counter,
rates,
msgs_on_disk,
- msg_indices_on_disk
+ msg_indices_on_disk,
+ need_acking
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -326,7 +327,8 @@
in_counter :: non_neg_integer(),
rates :: rates(),
msgs_on_disk :: gb_set(),
- msg_indices_on_disk :: gb_set()}).
+ msg_indices_on_disk :: gb_set(),
+ need_acking :: gb_set()}).
-include("rabbit_backing_queue_spec.hrl").
@@ -443,7 +445,8 @@ init(QueueName, IsDurable, Recover) ->
avg_ingress = 0.0,
timestamp = Now },
msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new()},
+ msg_indices_on_disk = gb_sets:new(),
+ need_acking = gb_sets:new()},
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -511,7 +514,8 @@ publish(Msg, State) ->
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
+publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = Guid },
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -529,7 +533,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- pending_ack = PA1 })}.
+ pending_ack = PA1,
+ need_acking = gb_sets:insert(Guid, State1#vqstate.need_acking)})}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -1014,7 +1019,8 @@ remove_queue_entries1(
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent },
+publish(Msg = #basic_message { is_persistent = IsPersistent,
+ guid = Guid },
IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
@@ -1036,7 +1042,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1}}.
+ ram_msg_count = RamMsgCount + 1,
+ need_acking = gb_sets:add(Guid, State2#vqstate.need_acking) }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->
@@ -1136,9 +1143,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- AckdGuids = lists:append(orddict:fold(fun (_, Guids, Acc) ->
- [Guids || Acc]
- end, [], GuidsByStore)),
+ AckdGuids = lists:append([Guids ||
+ {_Store, Guids} <- orddict:to_list(GuidsByStore)]),
State2 = msgs_confirmed(AckdGuids, State1),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
@@ -1162,26 +1168,33 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
%%----------------------------------------------------------------------------
msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD }) ->
+ msg_indices_on_disk = MIOD,
+ need_acking = NA }) ->
+ GuidSet = gb_sets:from_list(Guids),
State #vqstate {
msgs_on_disk =
- gb_sets:difference(MOD, gb_sets:from_list(Guids)),
+ gb_sets:difference(MOD, GuidSet),
msg_indices_on_disk =
- gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }.
+ gb_sets:difference(MIOD, GuidSet),
+ need_acking =
+ gb_sets:difference(NA, GuidSet) }.
msgs_written_to_disk(QPid, Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
QPid,
fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD}) ->
+ msg_indices_on_disk = MIOD,
+ need_acking = NA }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
+ MOD1 = gb_sets:intersection(gb_sets:union(MOD, GuidSet), NA),
{ State #vqstate {
msgs_on_disk =
- gb_sets:difference(gb_sets:union(MOD, GuidSet),
- ToConfirmMsgs),
+ gb_sets:difference(MOD1, ToConfirmMsgs),
msg_indices_on_disk =
- gb_sets:difference(MIOD, ToConfirmMsgs) },
+ gb_sets:difference(MIOD, ToConfirmMsgs),
+ need_acking =
+ gb_sets:difference(NA, ToConfirmMsgs) },
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).
@@ -1191,15 +1204,19 @@ msg_indices_written_to_disk(Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self,
fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD}) ->
+ msg_indices_on_disk = MIOD,
+ need_acking = NA }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
+ MIOD1 =
+ gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NA),
{ State #vqstate {
msgs_on_disk =
gb_sets:difference(MOD, ToConfirmMsgs),
msg_indices_on_disk =
- gb_sets:difference(gb_sets:union(MIOD, GuidSet),
- ToConfirmMsgs) },
+ gb_sets:difference(MIOD1, ToConfirmMsgs),
+ need_acking =
+ gb_sets:difference(NA, ToConfirmMsgs) },
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).