summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-22 07:53:16 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-22 07:53:16 +0100
commitb573cde7ceee9b1eba416a11aa3087aafcacd1d0 (patch)
tree00eff510b6ab5633ac73c31e62a0b07c97d48d67
parentff4c2ad5c0f6879c4e3a0818600e1252424000b5 (diff)
downloadrabbitmq-server-git-b573cde7ceee9b1eba416a11aa3087aafcacd1d0.tar.gz
do not call msg_store:attrs/2 from disk_queue:{phantom_fetch,prefetch}
It turns out that we don't actually need the 'persistent' attribute. So this saves us a potentially expensive interaction with the msg_store.
-rw-r--r--src/rabbit_disk_queue.erl27
-rw-r--r--src/rabbit_mixed_queue.erl4
2 files changed, 11 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8991939d83..02c20e300c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -94,7 +94,7 @@
{message(), boolean(), ack_tag(), non_neg_integer()})).
-spec(phantom_fetch/1 :: (queue_name()) ->
('empty' |
- {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})).
+ {msg_id(), boolean(), ack_tag(), non_neg_integer()})).
-spec(prefetch/1 :: (queue_name()) -> 'ok').
-spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
-spec(tx_publish/1 :: (message()) -> 'ok').
@@ -213,11 +213,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({fetch, Q}, _From, State) ->
- {Result, State1} =
- internal_fetch_body(Q, record_delivery, pop_queue, State),
+ {Result, State1} = internal_fetch_body(Q, pop_queue, State),
reply(Result, State1);
handle_call({phantom_fetch, Q}, _From, State) ->
- Result = internal_fetch_attributes(Q, record_delivery, pop_queue, State),
+ Result = internal_fetch_attributes(Q, record_delivery, State),
reply(Result, State);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
State1 =
@@ -268,8 +267,7 @@ handle_cast({requeue_next_n, Q, N}, State) ->
{ok, State1} = internal_requeue_next_n(Q, N, State),
noreply(State1);
handle_cast({prefetch, Q, From}, State) ->
- {Result, State1} =
- internal_fetch_body(Q, record_delivery, peek_queue, State),
+ {Result, State1} = internal_fetch_body(Q, peek_queue, State),
case rabbit_misc:with_exit_handler(
fun () -> false end,
fun () ->
@@ -277,7 +275,7 @@ handle_cast({prefetch, Q, From}, State) ->
true
end) of
true ->
- internal_fetch_attributes(Q, ignore_delivery, pop_queue, State1);
+ internal_fetch_attributes(Q, ignore_delivery, State1);
false -> ok
end,
noreply(State1).
@@ -367,9 +365,8 @@ sync(State = #dqstate { store = Store, on_sync_txns = Txns }) ->
%% internal functions
%%----------------------------------------------------------------------------
-internal_fetch_body(Q, MarkDelivered, Advance,
- State = #dqstate { store = Store }) ->
- case next(Q, MarkDelivered, Advance, State) of
+internal_fetch_body(Q, Advance, State = #dqstate { store = Store }) ->
+ case next(Q, record_delivery, Advance, State) of
empty -> {empty, State};
{MsgId, IsDelivered, AckTag, Remaining} ->
{Message, Store1} = rabbit_msg_store:read(MsgId, Store),
@@ -377,14 +374,8 @@ internal_fetch_body(Q, MarkDelivered, Advance,
{{Message, IsDelivered, AckTag, Remaining}, State1}
end.
-internal_fetch_attributes(Q, MarkDelivered, Advance,
- State = #dqstate { store = Store }) ->
- case next(Q, MarkDelivered, Advance, State) of
- empty -> empty;
- {MsgId, IsDelivered, AckTag, Remaining} ->
- IsPersistent = rabbit_msg_store:attrs(MsgId, Store),
- {MsgId, IsPersistent, IsDelivered, AckTag, Remaining}
- end.
+internal_fetch_attributes(Q, MarkDelivered, State) ->
+ next(Q, MarkDelivered, pop_queue, State).
next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3015f6dc3b..ddae4da043 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -155,7 +155,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
%% must call phantom_fetch otherwise the msg remains at the head
%% of the queue. This is synchronous, but unavoidable as we need
%% the AckTag
- {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
+ {MsgId, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
publish_delivered(Msg, State = #mqstate { length = 0 }) ->
Msg1 = ensure_binary_properties(Msg),
@@ -175,7 +175,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
AckTag =
case IsDurable andalso IsPersistent of
true ->
- {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem}
+ {MsgId, IsDelivered, AckTag1, _PRem}
= rabbit_disk_queue:phantom_fetch(Q),
AckTag1;
false ->