summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-14 16:33:40 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-14 16:33:40 +0100
commita62c475c3f3b66251783235c4209cfac736e0292 (patch)
tree577d30bb30a0939f7f427b506072c018bb3a6d72
parente7996f92e43a99f74dec6d25f70cff865134aed5 (diff)
parent88548854aec0fd482b2a42993507ae9583669466 (diff)
downloadrabbitmq-server-git-a62c475c3f3b66251783235c4209cfac736e0292.tar.gz
Merge bug25390
-rw-r--r--src/rabbit_queue_index.erl32
1 files changed, 27 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index ea70208fac..847d39c143 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
-export([scan/3]).
--export([add_queue_ttl/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -123,9 +123,9 @@
-define(REL_SEQ_BITS, 14).
-define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
-%% seq only is binary 00 followed by 14 bits of rel seq id
+%% seq only is binary 01 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
--define(REL_SEQ_ONLY_PREFIX, 00).
+-define(REL_SEQ_ONLY_PREFIX, 01).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
@@ -171,6 +171,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-ifdef(use_specs).
@@ -715,7 +716,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- {ok, Bin} ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
+ State;
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
{MsgId, MsgProps} = parse_pub_record_body(Bin),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
@@ -1057,6 +1062,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
add_queue_ttl_segment(_) ->
stop.
+avoid_zeroes() ->
+ foreach_queue_index({none, fun avoid_zeroes_segment/1}).
+
+avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
+avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+avoid_zeroes_segment(_) ->
+ stop.
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1081,7 +1101,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
|| Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
-transform_file(Path, Fun) ->
+transform_file(_Path, none) ->
+ ok;
+transform_file(Path, Fun) when is_function(Fun)->
PathTmp = Path ++ ".upgrade",
case rabbit_file:file_size(Path) of
0 -> ok;