diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-15 11:39:44 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-15 11:39:44 +0100 |
| commit | d6e9d50eece7328bae67ad2fe01ac59e33d8030b (patch) | |
| tree | 3ef0f9a17c3bc304dfb82a9bdbbbf24f5a19cae4 /src | |
| parent | e3150cae57e5638391390dbfc608dc138c79b0cc (diff) | |
| download | rabbitmq-server-git-d6e9d50eece7328bae67ad2fe01ac59e33d8030b.tar.gz | |
added phantom_deliver. This does everything that deliver does but it doesn't actually read the message. This is useful if the same messages are being tracked in multiple different queues (eg a RAM queue and a disk-backed queue) and you want to mark the message delivered without it being retrieved. It still needs acking in the normal way.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_db_queue.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 38 |
2 files changed, 47 insertions, 22 deletions
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl index bd6820d5d0..495bdafb23 100644 --- a/src/rabbit_db_queue.erl +++ b/src/rabbit_db_queue.erl @@ -59,7 +59,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). -export([stop/0, stop_and_obliterate/0]). @@ -80,6 +80,8 @@ -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}}}). +-spec(phantom_deliver/1 :: (queue_name()) -> + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). @@ -102,6 +104,9 @@ publish(Q, MsgId, Msg) when is_binary(Msg) -> deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). +phantom_deliver(Q) -> + gen_server:call(?SERVER, {phantom_deliver, Q}). + ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). @@ -136,7 +141,10 @@ init([DSN]) -> {ok, State}. handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, State), + {ok, Result, State1} = internal_deliver(Q, true, State), + {reply, Result, State1}; +handle_call({phantom_deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), @@ -232,7 +240,7 @@ hex_string_to_binary([A,B|Rest], Acc) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, State = #dbstate { db_conn = Conn }) -> +internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) -> QStr = binary_to_escaped_string(term_to_binary(Q)), case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of {selected, _, []} -> @@ -252,14 +260,19 @@ internal_deliver(Q, State = #dbstate { db_conn = Conn }) -> MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)), %% yeah, this is really necessary. sigh MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)), - {selected, _, [{MsgBodyStr}]} = - odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2), odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++ " where queue = " ++ QStr), - odbc:commit(Conn, commit), - MsgBody = hex_string_to_binary(MsgBodyStr), - BodySize = size(MsgBody), - {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State} + if ReadMsg -> + {selected, _, [{MsgBodyStr}]} = + odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2), + odbc:commit(Conn, commit), + MsgBody = hex_string_to_binary(MsgBodyStr), + BodySize = size(MsgBody), + {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}; + true -> + odbc:commit(Conn, commit), + {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State} + end end end. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e3b47e89dc..1a19fd6f8e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -227,6 +227,8 @@ -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}}}). +-spec(phantom_deliver/1 :: (queue_name()) -> + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). @@ -251,6 +253,9 @@ publish(Q, MsgId, Msg) when is_binary(Msg) -> deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). +phantom_deliver(Q) -> + gen_server:call(?SERVER, {phantom_deliver, Q}). + ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). @@ -335,7 +340,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State1 #dqstate { current_file_handle = FileHdl }}. handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, State), + {ok, Result, State1} = internal_deliver(Q, true, State), + {reply, Result, State1}; +handle_call({phantom_deliver, Q}, _From, State) -> + {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), @@ -465,7 +473,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> +internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; [{Q, ReadSeqId, WriteSeqId}] -> @@ -475,17 +483,21 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) -> #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - {FileHdl, State1} = get_read_handle(File, State), - %% read the message - {ok, {MsgBody, BodySize}} = - read_message_at_offset(FileHdl, Offset, TotalSize), - if Delivered -> ok; - true -> ok = mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, - State1} + ok = + if Delivered -> ok; + true -> + mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc {is_delivered = true}) + end, + if ReadMsg -> + {FileHdl, State1} = get_read_handle(File, State), + {ok, {MsgBody, BodySize}} = + read_message_at_offset(FileHdl, Offset, TotalSize), + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + State1}; + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} + end end end. |
