summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-15 11:39:44 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-15 11:39:44 +0100
commitd6e9d50eece7328bae67ad2fe01ac59e33d8030b (patch)
tree3ef0f9a17c3bc304dfb82a9bdbbbf24f5a19cae4
parente3150cae57e5638391390dbfc608dc138c79b0cc (diff)
downloadrabbitmq-server-git-d6e9d50eece7328bae67ad2fe01ac59e33d8030b.tar.gz
added phantom_deliver. This does everything that deliver does but it doesn't actually read the message. This is useful if the same messages are being tracked in multiple different queues (eg a RAM queue and a disk-backed queue) and you want to mark the message delivered without it being retrieved. It still needs acking in the normal way.
-rw-r--r--src/rabbit_db_queue.erl31
-rw-r--r--src/rabbit_disk_queue.erl38
2 files changed, 47 insertions, 22 deletions
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl
index bd6820d5d0..495bdafb23 100644
--- a/src/rabbit_db_queue.erl
+++ b/src/rabbit_db_queue.erl
@@ -59,7 +59,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
-export([stop/0, stop_and_obliterate/0]).
@@ -80,6 +80,8 @@
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}}}).
+-spec(phantom_deliver/1 :: (queue_name()) ->
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
@@ -102,6 +104,9 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
+phantom_deliver(Q) ->
+ gen_server:call(?SERVER, {phantom_deliver, Q}).
+
ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
@@ -136,7 +141,10 @@ init([DSN]) ->
{ok, State}.
handle_call({deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, State),
+ {ok, Result, State1} = internal_deliver(Q, true, State),
+ {reply, Result, State1};
+handle_call({phantom_deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, false, State),
{reply, Result, State1};
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
@@ -232,7 +240,7 @@ hex_string_to_binary([A,B|Rest], Acc) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, State = #dbstate { db_conn = Conn }) ->
+internal_deliver(Q, ReadMsg, State = #dbstate { db_conn = Conn }) ->
QStr = binary_to_escaped_string(term_to_binary(Q)),
case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
{selected, _, []} ->
@@ -252,14 +260,19 @@ internal_deliver(Q, State = #dbstate { db_conn = Conn }) ->
MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)),
%% yeah, this is really necessary. sigh
MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)),
- {selected, _, [{MsgBodyStr}]} =
- odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++
" where queue = " ++ QStr),
- odbc:commit(Conn, commit),
- MsgBody = hex_string_to_binary(MsgBodyStr),
- BodySize = size(MsgBody),
- {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}
+ if ReadMsg ->
+ {selected, _, [{MsgBodyStr}]} =
+ odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
+ odbc:commit(Conn, commit),
+ MsgBody = hex_string_to_binary(MsgBodyStr),
+ BodySize = size(MsgBody),
+ {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State};
+ true ->
+ odbc:commit(Conn, commit),
+ {ok, {MsgId, IsDelivered, {MsgId, ReadSeqId}}, State}
+ end
end
end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e3b47e89dc..1a19fd6f8e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -227,6 +227,8 @@
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}}}).
+-spec(phantom_deliver/1 :: (queue_name()) ->
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
@@ -251,6 +253,9 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
+phantom_deliver(Q) ->
+ gen_server:call(?SERVER, {phantom_deliver, Q}).
+
ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
@@ -335,7 +340,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
handle_call({deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, State),
+ {ok, Result, State1} = internal_deliver(Q, true, State),
+ {reply, Result, State1};
+handle_call({phantom_deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, false, State),
{reply, Result, State1};
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
@@ -465,7 +473,7 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, State = #dqstate { sequences = Sequences }) ->
+internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId}] ->
@@ -475,17 +483,21 @@ internal_deliver(Q, State = #dqstate { sequences = Sequences }) ->
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
[{MsgId, _RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
- {FileHdl, State1} = get_read_handle(File, State),
- %% read the message
- {ok, {MsgBody, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- if Delivered -> ok;
- true -> ok = mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
- State1}
+ ok =
+ if Delivered -> ok;
+ true ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ if ReadMsg ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ State1};
+ true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ end
end
end.