summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-12 23:42:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-12 23:42:13 +0000
commitf553fc420e9b74fac660ecf8cf37a09ef926176a (patch)
tree3cd3dd7681a1899e4ad02f73be5a61aa7015609c
parentbb0605e3d608e549bf06be2f23091dda4c9e31ae (diff)
parent41f78d73795134f11554072f7409f40f1a2167d2 (diff)
downloadrabbitmq-server-git-f553fc420e9b74fac660ecf8cf37a09ef926176a.tar.gz
merge default into bug25395
-rw-r--r--src/rabbit_variable_queue.erl45
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8cb5da0b2e..baeb4721ab 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -879,11 +879,25 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message { id = MsgId }, MsgProps) ->
- #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = false, index_on_disk = false,
- msg_props = MsgProps }.
+ Msg = #basic_message {id = MsgId}, MsgProps) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = false,
+ index_on_disk = false,
+ msg_props = MsgProps}.
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps}.
trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
@@ -950,7 +964,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -958,21 +972,10 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
[SeqId | Acks1]};
false -> case (gb_trees:is_defined(SeqId, RPA) orelse
gb_trees:is_defined(SeqId, DPA)) of
- false ->
- {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
- Delivers1, Acks1};
- true ->
- Acc
+ false -> {?QUEUE:in_r(m(beta_msg_status(M)),
+ Filtered1),
+ Delivers1, Acks1};
+ true -> Acc
end
end
end, {?QUEUE:new(), [], []}, List),