diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 12:53:10 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-05 12:53:10 +0000 |
| commit | cbbe81b575def63e9edd47dd1a66e034fc1367b6 (patch) | |
| tree | 0b52b541287ee3f289bc43c70636b263c6c67c24 /src | |
| parent | e01cf9e2d7c60c640d138f8e3da1eb001d146813 (diff) | |
| download | rabbitmq-server-git-cbbe81b575def63e9edd47dd1a66e034fc1367b6.tar.gz | |
Upgrade
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 39 |
1 files changed, 36 insertions, 3 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a78dacecba..08c20ce2d8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -186,6 +186,7 @@ -rabbit_upgrade({add_queue_ttl, local, []}). -rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). +-rabbit_upgrade({store_msg, local, [store_msg_size]}). -ifdef(use_specs). @@ -1204,10 +1205,42 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, store_msg_size_segment(_) -> stop. +store_msg() -> + foreach_queue_index({fun store_msg_journal/1, + fun store_msg_segment/1}). + +store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_journal(_) -> + stop. + +store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?MSG_IN_INDEX_SIZE_BITS>>, Rest}; +store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_segment(_) -> + stop. -%%---------------------------------------------------------------------------- -%% TODO here? + + +%%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> QueuesDir = queues_dir(), |
