summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_mixed_queue.erl5
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8fe0d62386..80051149fe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -90,13 +90,14 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
-init(Q) ->
+init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ {ok, MS} = rabbit_mixed_queue:start_link(QName, mixed), %% TODO, CHANGE ME
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- mixed_state = rabbit_mixed_queue:start_link(qname(Q), mixed), %% TODO, CHANGE ME
+ mixed_state = MS,
next_msg_id = 1,
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
@@ -437,17 +438,17 @@ commit_transaction(Txn, State) ->
{MsgWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- MS = rabbit_mixed_queue:tx_commit(
- PendingMessagesOrdered,
- lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
- State #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered,
+ lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ State #q.mixed_state),
State #q { mixed_state = MS }
end.
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
- MS = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
+ {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state),
State #q { mixed_state = MS }.
%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
@@ -631,12 +632,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
mixed_state = MS,
round_robin = RoundRobin}) ->
- {Length, MS2} = rabbit_mixed_queue:length(MS),
- reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 });
+ Length = rabbit_mixed_queue:length(MS),
+ reply({ok, Name, Length, queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q { mixed_state = MS }) ->
- {Length, MS2} = rabbit_mixed_queue:length(MS),
+ Length = rabbit_mixed_queue:length(MS),
IsEmpty = Length == 0,
IsUnused = is_unused(),
if
@@ -645,7 +646,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, Length}, State #q { mixed_state = MS2 }}
+ {stop, normal, {ok, Length}, State}
end;
handle_call(purge, _From, State) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 24d0de8d59..037aeebf18 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -204,10 +204,9 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) ->
{Count, State #mqstate { msg_buf = queue:new() }}.
length(State = #mqstate { queue = Q, mode = disk }) ->
- Length = rabbit_disk_queue:length(Q),
- {Length, State};
+ rabbit_disk_queue:length(Q);
length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
- {queue:length(MsgBuf), State}.
+ queue:len(MsgBuf).
is_empty(State) ->
0 == rabbit_mixed_queue:length(State).