summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl53
1 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c2f90bac77..031ed882e6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,11 +31,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, publish/2, publish_delivered/3,
- set_ram_duration_target/2, ram_duration/1, fetch/2, ack/2, len/1,
- is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/3,
- tx_ack/3, tx_rollback/2, tx_commit/3, needs_sync/1, sync/1,
- handle_pre_hibernate/1, status/1]).
+-export([init/3, terminate/1, delete_and_terminate/1,
+ purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
+ tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
+ requeue/2, len/1, is_empty/1,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_sync/1, sync/1, handle_pre_hibernate/1, status/1]).
-export([start/1]).
@@ -693,6 +694,23 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
is_persistent = IsPersistent, is_delivered = false,
msg_on_disk = false, index_on_disk = false }.
+find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
+find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
+
+with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
+ {Result, {{MSCStateP1, PRef}, MSCStateT}};
+with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
+ {Result, {MSCStateP, {MSCStateT1, TRef}}}.
+
+read_from_msg_store(MSCState, IsPersistent, Guid) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MsgStore, MSCState1) ->
+ rabbit_msg_store:read(MsgStore, Guid, MSCState1)
+ end).
+
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
maybe_write_delivered(true, SeqId, IndexState) ->
@@ -1174,25 +1192,8 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
Q2) }
end.
-find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
-find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
-
-with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
- {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
- {Result, {{MSCStateP1, PRef}, MSCStateT}};
-with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
- {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
- {Result, {MSCStateP, {MSCStateT1, TRef}}}.
-
-read_from_msg_store(MSCState, IsPersistent, Guid) ->
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MsgStore, MSCState1) ->
- rabbit_msg_store:read(MsgStore, Guid, MSCState1)
- end).
-
-maybe_write_msg_to_disk(_Force, MsgStatus =
- #msg_status { msg_on_disk = true }, MSCState) ->
+maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
+ msg_on_disk = true }, MSCState) ->
{MsgStatus, MSCState};
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, guid = Guid,
@@ -1209,8 +1210,8 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
{MsgStatus, MSCState}.
-maybe_write_index_to_disk(_Force, MsgStatus =
- #msg_status { index_on_disk = true }, IndexState) ->
+maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
+ index_on_disk = true }, IndexState) ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {