diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-30 10:40:19 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-30 10:40:19 +0100 |
| commit | f0da80b00390c1031934824a651f16565f4f6a5a (patch) | |
| tree | a39dbe564ffe81b431d4f72a4ade8859cb92d48c | |
| parent | 5e15b21c8cce36b3755055eb0e2314b4547a6dac (diff) | |
| download | rabbitmq-server-git-f0da80b00390c1031934824a651f16565f4f6a5a.tar.gz | |
:%s/need_confirming/unconfirmed/g
| -rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
2 files changed, 32 insertions, 32 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a6472e7aa1..40cb1a9503 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, published_count, confirm_multiple, confirm_tref, - held_confirms, need_confirming, qpid_to_msgs}). + held_confirms, unconfirmed, qpid_to_msgs}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, published_count = 0, confirm_multiple = false, held_confirms = gb_sets:new(), - need_confirming = gb_sets:new(), + unconfirmed = gb_sets:new(), qpid_to_msgs = dict:new() }, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, @@ -281,8 +281,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> handle_cast(flush_multiple_acks, State = #ch{writer_pid = WriterPid, held_confirms = As, - need_confirming = NA}) -> - flush_multiple(WriterPid, As, NA), + unconfirmed = UC}) -> + flush_multiple(WriterPid, As, UC), {noreply, State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined}}; @@ -306,9 +306,9 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, handle_pre_hibernate(State = #ch{writer_pid = WriterPid, held_confirms = As, stats_timer = StatsTimer, - need_confirming = NA}) -> + unconfirmed = UC}) -> ok = clear_permission_cache(), - flush_multiple(WriterPid, As, NA), + flush_multiple(WriterPid, As, UC), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -491,11 +491,11 @@ msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, gb_sets:new()), QTM)} end. -do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) -> - case gb_sets:is_element(MsgSeqNo, NA) of +do_if_not_dup(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> + case gb_sets:is_element(MsgSeqNo, UC) of true -> State1 = Fun(MsgSeqNo, State), - State1 #ch { need_confirming = gb_sets:delete(MsgSeqNo, NA) }; + State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)}; false -> State end. @@ -542,8 +542,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Count = State#ch.published_count, {Count, State#ch{published_count = Count + 1, - need_confirming = - gb_sets:add(Count, State#ch.need_confirming) }} + unconfirmed = + gb_sets:add(Count, State#ch.unconfirmed) }} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 31fb21f6ef..185617d57c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -239,7 +239,7 @@ rates, msgs_on_disk, msg_indices_on_disk, - need_confirming + unconfirmed }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -328,7 +328,7 @@ rates :: rates(), msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), - need_confirming :: gb_set()}). + unconfirmed :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -449,7 +449,7 @@ init(QueueName, IsDurable, Recover, timestamp = Now }, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), - need_confirming = gb_sets:new()}, + unconfirmed = gb_sets:new()}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -527,24 +527,24 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount, pending_ack = PA, durable = IsDurable, - need_confirming = NeedConfirming }) -> + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - NeedConfirming1 = case NeedsConfirming of - true -> gb_sets:add(Guid, NeedConfirming); - false -> NeedConfirming - end, + Unconfirmed1 = case NeedsConfirming of + true -> gb_sets:add(Guid, Unconfirmed); + false -> Unconfirmed + end, {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1, - need_confirming = NeedConfirming1 })}. + unconfirmed = Unconfirmed1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -1044,7 +1044,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount, - need_confirming = NeedConfirming}) -> + unconfirmed = Unconfirmed}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, @@ -1054,17 +1054,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - NeedConfirming1 = case NeedsConfirming of - true -> gb_sets:add(Guid, NeedConfirming); - false -> NeedConfirming - end, + Unconfirmed1 = case NeedsConfirming of + true -> gb_sets:add(Guid, Unconfirmed); + false -> Unconfirmed + end, {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - need_confirming = NeedConfirming1 }}. + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1189,24 +1189,24 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> msgs_confirmed(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, - need_confirming = NC }) -> + unconfirmed = UC }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), - need_confirming = gb_sets:difference(NC, GuidSet) }. + unconfirmed = gb_sets:difference(UC, GuidSet) }. msgs_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( QPid, fun(State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, - need_confirming = NC }) -> + unconfirmed = UC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), State1 = State #vqstate { msgs_on_disk = gb_sets:intersection( - gb_sets:union(MOD, GuidSet), NC) }, + gb_sets:union(MOD, GuidSet), UC) }, { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) @@ -1217,14 +1217,14 @@ msg_indices_written_to_disk(QPid, Guids) -> QPid, fun(State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, - need_confirming = NC }) -> + unconfirmed = UC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), State1 = State #vqstate { msg_indices_on_disk = gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), NC) }, + gb_sets:union(MIOD, GuidSet), UC) }, { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) |
