diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-03 17:17:58 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-03 17:17:58 +0100 |
| commit | a1a5b973055dbd48c1f5a6240df59c17fa7954d3 (patch) | |
| tree | 9805c285d8e30cc46b688762356c9a994b207bba /src | |
| parent | 954ba7ce3f586acfa8f61b328ee77f0def521240 (diff) | |
| download | rabbitmq-server-git-a1a5b973055dbd48c1f5a6240df59c17fa7954d3.tar.gz | |
stopped a race
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 57 |
1 files changed, 37 insertions, 20 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c33859ee59..9e7b1d2c7b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -238,7 +238,8 @@ in_counter, rates, msgs_on_disk, - msg_indices_on_disk + msg_indices_on_disk, + need_acking }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -326,7 +327,8 @@ in_counter :: non_neg_integer(), rates :: rates(), msgs_on_disk :: gb_set(), - msg_indices_on_disk :: gb_set()}). + msg_indices_on_disk :: gb_set(), + need_acking :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -443,7 +445,8 @@ init(QueueName, IsDurable, Recover) -> avg_ingress = 0.0, timestamp = Now }, msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new()}, + msg_indices_on_disk = gb_sets:new(), + need_acking = gb_sets:new()}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -511,7 +514,8 @@ publish(Msg, State) -> publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -529,7 +533,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - pending_ack = PA1 })}. + pending_ack = PA1, + need_acking = gb_sets:insert(Guid, State1#vqstate.need_acking)})}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -1014,7 +1019,8 @@ remove_queue_entries1( %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, +publish(Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, @@ -1036,7 +1042,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + ram_msg_count = RamMsgCount + 1, + need_acking = gb_sets:add(Guid, State2#vqstate.need_acking) }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1136,9 +1143,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - AckdGuids = lists:append(orddict:fold(fun (_, Guids, Acc) -> - [Guids || Acc] - end, [], GuidsByStore)), + AckdGuids = lists:append([Guids || + {_Store, Guids} <- orddict:to_list(GuidsByStore)]), State2 = msgs_confirmed(AckdGuids, State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; @@ -1162,26 +1168,33 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> %%---------------------------------------------------------------------------- msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD }) -> + msg_indices_on_disk = MIOD, + need_acking = NA }) -> + GuidSet = gb_sets:from_list(Guids), State #vqstate { msgs_on_disk = - gb_sets:difference(MOD, gb_sets:from_list(Guids)), + gb_sets:difference(MOD, GuidSet), msg_indices_on_disk = - gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }. + gb_sets:difference(MIOD, GuidSet), + need_acking = + gb_sets:difference(NA, GuidSet) }. msgs_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( QPid, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD}) -> + msg_indices_on_disk = MIOD, + need_acking = NA }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), + MOD1 = gb_sets:intersection(gb_sets:union(MOD, GuidSet), NA), { State #vqstate { msgs_on_disk = - gb_sets:difference(gb_sets:union(MOD, GuidSet), - ToConfirmMsgs), + gb_sets:difference(MOD1, ToConfirmMsgs), msg_indices_on_disk = - gb_sets:difference(MIOD, ToConfirmMsgs) }, + gb_sets:difference(MIOD, ToConfirmMsgs), + need_acking = + gb_sets:difference(NA, ToConfirmMsgs) }, {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). @@ -1191,15 +1204,19 @@ msg_indices_written_to_disk(Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD}) -> + msg_indices_on_disk = MIOD, + need_acking = NA }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), + MIOD1 = + gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NA), { State #vqstate { msgs_on_disk = gb_sets:difference(MOD, ToConfirmMsgs), msg_indices_on_disk = - gb_sets:difference(gb_sets:union(MIOD, GuidSet), - ToConfirmMsgs) }, + gb_sets:difference(MIOD1, ToConfirmMsgs), + need_acking = + gb_sets:difference(NA, ToConfirmMsgs) }, {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). |
