summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-14 16:31:01 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-14 16:31:01 +0100
commitcc86d0f3ff7de862626f7a55d9de30f3a5969470 (patch)
tree43db90c95c1c2d3bad604a8df7bc5219777095e7
parent68ffe14519589afcd689141855602c3c94c16366 (diff)
downloadrabbitmq-server-git-cc86d0f3ff7de862626f7a55d9de30f3a5969470.tar.gz
some initial fixes
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_variable_queue.erl10
2 files changed, 8 insertions, 4 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a38732bdba..b58c5a7f6f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -456,7 +456,7 @@ find_ack_counts_and_deliver_transient_msgs(Dir) ->
scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict) ->
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
case file:open(JournalPath, [read, read_ahead, raw, binary]) of
- {error, enoent} -> AckCounts;
+ {error, enoent} -> {TotalMsgCount, AckCounts};
{ok, Hdl} ->
%% ADict may well contain duplicates. However, this is ok,
%% due to the use of sets in replay_journal_acks_to_segment
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9ca06a1c90..a7a07556e8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -167,6 +167,7 @@ set_queue_ram_duration_target(
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 },
if TargetRamMsgCount == TargetRamMsgCount1 ->
State1;
+ TargetRamMsgCount == undefined orelse
TargetRamMsgCount < TargetRamMsgCount1 ->
maybe_start_prefetcher(State1);
true ->
@@ -252,7 +253,10 @@ maybe_start_prefetcher(State = #vqstate {
q1 = Q1, q3 = Q3, prefetcher = undefined
}) ->
%% prefetched content takes priority over q1
- AvailableSpace = (TargetRamMsgCount - RamMsgCount) + queue:len(Q1),
+ AvailableSpace = case TargetRamMsgCount of
+ undefined -> queue:len(Q3);
+ _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1)
+ end,
PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
if PrefetchCount =< 0 -> State;
true ->
@@ -602,7 +606,7 @@ drain_prefetcher(DrainOrStop,
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount >= RamMsgCount ->
+ when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
@@ -726,7 +730,7 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
#vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
- when TargetRamMsgCount >= RamMsgCount ->
+ when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
maybe_push_alphas_to_betas(Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount }) ->