diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-09-22 07:53:16 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-09-22 07:53:16 +0100 |
| commit | b573cde7ceee9b1eba416a11aa3087aafcacd1d0 (patch) | |
| tree | 00eff510b6ab5633ac73c31e62a0b07c97d48d67 | |
| parent | ff4c2ad5c0f6879c4e3a0818600e1252424000b5 (diff) | |
| download | rabbitmq-server-git-b573cde7ceee9b1eba416a11aa3087aafcacd1d0.tar.gz | |
do not call msg_store:attrs/2 from disk_queue:{phantom_fetch,prefetch}
It turns out that we don't actually need the 'persistent'
attribute. So this saves us a potentially expensive interaction with
the msg_store.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 4 |
2 files changed, 11 insertions, 20 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 8991939d83..02c20e300c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -94,7 +94,7 @@ {message(), boolean(), ack_tag(), non_neg_integer()})). -spec(phantom_fetch/1 :: (queue_name()) -> ('empty' | - {msg_id(), boolean(), boolean(), ack_tag(), non_neg_integer()})). + {msg_id(), boolean(), ack_tag(), non_neg_integer()})). -spec(prefetch/1 :: (queue_name()) -> 'ok'). -spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). @@ -213,11 +213,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({fetch, Q}, _From, State) -> - {Result, State1} = - internal_fetch_body(Q, record_delivery, pop_queue, State), + {Result, State1} = internal_fetch_body(Q, pop_queue, State), reply(Result, State1); handle_call({phantom_fetch, Q}, _From, State) -> - Result = internal_fetch_attributes(Q, record_delivery, pop_queue, State), + Result = internal_fetch_attributes(Q, record_delivery, State), reply(Result, State); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> State1 = @@ -268,8 +267,7 @@ handle_cast({requeue_next_n, Q, N}, State) -> {ok, State1} = internal_requeue_next_n(Q, N, State), noreply(State1); handle_cast({prefetch, Q, From}, State) -> - {Result, State1} = - internal_fetch_body(Q, record_delivery, peek_queue, State), + {Result, State1} = internal_fetch_body(Q, peek_queue, State), case rabbit_misc:with_exit_handler( fun () -> false end, fun () -> @@ -277,7 +275,7 @@ handle_cast({prefetch, Q, From}, State) -> true end) of true -> - internal_fetch_attributes(Q, ignore_delivery, pop_queue, State1); + internal_fetch_attributes(Q, ignore_delivery, State1); false -> ok end, noreply(State1). @@ -367,9 +365,8 @@ sync(State = #dqstate { store = Store, on_sync_txns = Txns }) -> %% internal functions %%---------------------------------------------------------------------------- -internal_fetch_body(Q, MarkDelivered, Advance, - State = #dqstate { store = Store }) -> - case next(Q, MarkDelivered, Advance, State) of +internal_fetch_body(Q, Advance, State = #dqstate { store = Store }) -> + case next(Q, record_delivery, Advance, State) of empty -> {empty, State}; {MsgId, IsDelivered, AckTag, Remaining} -> {Message, Store1} = rabbit_msg_store:read(MsgId, Store), @@ -377,14 +374,8 @@ internal_fetch_body(Q, MarkDelivered, Advance, {{Message, IsDelivered, AckTag, Remaining}, State1} end. -internal_fetch_attributes(Q, MarkDelivered, Advance, - State = #dqstate { store = Store }) -> - case next(Q, MarkDelivered, Advance, State) of - empty -> empty; - {MsgId, IsDelivered, AckTag, Remaining} -> - IsPersistent = rabbit_msg_store:attrs(MsgId, Store), - {MsgId, IsPersistent, IsDelivered, AckTag, Remaining} - end. +internal_fetch_attributes(Q, MarkDelivered, State) -> + next(Q, MarkDelivered, pop_queue, State). next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 3015f6dc3b..ddae4da043 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -155,7 +155,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, %% must call phantom_fetch otherwise the msg remains at the head %% of the queue. This is synchronous, but unavoidable as we need %% the AckTag - {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), + {MsgId, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; publish_delivered(Msg, State = #mqstate { length = 0 }) -> Msg1 = ensure_binary_properties(Msg), @@ -175,7 +175,7 @@ fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, AckTag = case IsDurable andalso IsPersistent of true -> - {MsgId, IsPersistent, IsDelivered, AckTag1, _PRem} + {MsgId, IsDelivered, AckTag1, _PRem} = rabbit_disk_queue:phantom_fetch(Q), AckTag1; false -> |
