summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 15:33:26 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 15:33:26 +0100
commit4f7984cc8c35fbe512883ff2bc1784690607d679 (patch)
tree4ec1e510a37a27b5059eb257735de597cc406e4d /src
parent433ce28978c03c72895ee092fee949c24b04d0af (diff)
downloadrabbitmq-server-git-4f7984cc8c35fbe512883ff2bc1784690607d679.tar.gz
refactor
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl69
4 files changed, 40 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f1085a0c74..72b9b49dac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -422,9 +422,8 @@ record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
- confirm_messages(AckdGuids,
- State#q{backing_queue_state = BQ:ack(AckTags, BQS)}).
+ {BQS1, AckdGuids} = BQ:ack(AckTags, BQS),
+ confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 787fc82ccb..d641e824c7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -91,7 +91,7 @@
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
--spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ddb78aff12..e03d1e94bb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1854,7 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {VQ9, _} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1864,7 +1864,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
+ {VQ3, _} = rabbit_variable_queue:ack([AckTag], VQ2),
+ publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -1897,7 +1898,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {VQ9, _} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a72ec2f7b6..c687ae02bf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -34,7 +34,7 @@
-export([init/5, init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
- requeue/2, len/1, is_empty/1, seqids_to_guids/2,
+ requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1]).
@@ -662,34 +662,27 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end)}.
requeue(AckTags, State) ->
- a(reduce_memory_use(
- ack(fun rabbit_msg_store:release/2,
- fun (#msg_status { msg = Msg }, State1) ->
- {_SeqId, State2} = publish(Msg, true, false, false, State1),
- State2;
- ({IsPersistent, Guid}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, true, true, false, State2),
- State3
- end,
- AckTags, State))).
+ {State1, _Guids} =
+ a(reduce_memory_use(
+ ack(fun rabbit_msg_store:release/2,
+ fun (#msg_status { msg = Msg }, State1) ->
+ {_SeqId, State2} = publish(Msg, true, false, false, State1),
+ State2;
+ ({IsPersistent, Guid}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, true, true, false, State2),
+ State3
+ end,
+ AckTags, State))),
+ State1.
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) ->
- lists:foldl(
- fun(SeqId, Guids) ->
- [case dict:fetch(SeqId, PA) of
- #msg_status { msg = Msg } -> Msg#basic_message.guid;
- {_, Guid} -> Guid
- end | Guids]
- end, [], SeqIds).
-
set_ram_duration_target(DurationTarget,
State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
@@ -789,6 +782,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
%% Minor helpers
%%----------------------------------------------------------------------------
+a({State, Other}) ->
+ {a(State), Other};
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
persistent_count = PersistentCount,
@@ -990,6 +985,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
Pubs = lists:append(lists:reverse(SPubs)),
+ {NewState, _Guids} = ack(Acks, State),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -997,7 +993,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
IsPersistent1 = IsDurable andalso IsPersistent,
{SeqId, State3} = publish(Msg, false, IsPersistent1, false, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ end, {PAcks, NewState}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1166,28 +1162,27 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {State, []};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
+ {Guids, {SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ fun (SeqId, {Gs, Acc, State2 = #vqstate { pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
- {accumulate_ack(SeqId, AckEntry, Acc),
+ {[AckEntry | Gs],
+ accumulate_ack(SeqId, AckEntry, Acc),
Fun(AckEntry, State2 #vqstate {
pending_ack = dict:erase(SeqId, PA) })}
- end, {{[], orddict:new()}, State}, AckTags),
+ end, {[], {[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- %% the AckTags were removed from State1, so use State in seqids_to_guids
- State2 = remove_confirms(
- gb_sets:from_list(seqids_to_guids(AckTags, State)), State1),
+ State2 = remove_confirms(gb_sets:from_list(Guids), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- State2 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ {State2 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }, Guids}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1289,6 +1284,8 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
+reduce_memory_use({State, Other}) ->
+ {reduce_memory_use(State), Other};
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,