summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 17:05:44 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 17:05:44 +0100
commit54c4957e9d44abe29982bb2d8313b4bc7ce56f6b (patch)
tree9ec9e2b26edf5eaa4853f4cc55e0127123e78034
parent4d85abee069bcfab253a7a9040ddcb08c6aa05b1 (diff)
downloadrabbitmq-server-git-54c4957e9d44abe29982bb2d8313b4bc7ce56f6b.tar.gz
publisher acks for persistent messages
Acknowledgements are sent to the producer in the following cases (assuming the channel in confirm mode): - a transient message has been received (i.e. it is ack'd immediately), - a basic return has been sent (the ack is sent immediately afterwards), - a persistent message has been consumed [by a consumer] or got [via basic.get] and ack'd by the client (or the retrieval was no-ack), - a persistent message has been written to disk (both the message and its index). At least in theory, every message published on a confirm channel will at some point be ack'd. A message is only ack'd once and the reason it was ack'd is hidden.
-rw-r--r--src/rabbit_amqqueue_process.erl54
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_variable_queue.erl22
3 files changed, 53 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f3fce61a03..8bed40bef4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -61,7 +61,9 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- guid_to_channel
+ guid_to_channel,
+ msgs_on_disk,
+ msg_indices_on_disk
}).
-record(consumer, {tag, ack_required}).
@@ -124,7 +126,9 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()}, hibernate,
+ guid_to_channel = dict:new(),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -402,21 +406,26 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) ->
+confirm_message_internal(Guid, State = #q { guid_to_channel = GTC,
+ msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD }) ->
case dict:find(Guid, GTC) of
{ok, {ChPid, MsgSeqNo}} ->
rabbit_channel:confirm(ChPid, MsgSeqNo),
- State#q{guid_to_channel = dict:erase(Guid, GTC)};
+ State #q { guid_to_channel = dict:erase(Guid, GTC),
+ msgs_on_disk = gb_sets:delete_any(Guid, MOD),
+ msg_indices_on_disk = gb_sets:delete_any(Guid, MIOD) };
_ ->
State
end.
maybe_record_confirm_message(undefined, _, _, State) ->
State;
-maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State) ->
- State#q{guid_to_channel =
- dict:store(Message#basic_message.guid, {ChPid, MsgSeqNo},
- State#q.guid_to_channel)}.
+maybe_record_confirm_message(MsgSeqNo,
+ #basic_message { guid = Guid },
+ ChPid, State) ->
+ State #q { guid_to_channel =
+ dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }.
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -847,6 +856,35 @@ handle_cast({confirm_messages, Guids}, State) ->
confirm_message_internal(Guid, State0)
end, State, Guids));
+handle_cast({msgs_written_to_disk, Guids},
+ State = #q{msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD}) ->
+ GuidSet = gb_sets:from_list(Guids),
+ ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
+ gb_sets:fold(fun (Guid, State0) ->
+ confirm_message_internal(Guid, State0)
+ end, State, ToConfirmMsgs),
+ noreply(State#q{msgs_on_disk =
+ gb_sets:difference(gb_sets:union(MOD, GuidSet),
+ ToConfirmMsgs),
+ msg_indices_on_disk =
+ gb_sets:difference(MIOD, ToConfirmMsgs)});
+
+handle_cast({msg_indices_written_to_disk, Guids},
+ State = #q{msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD}) ->
+ rabbit_log:info("Message indices written to disk: ~p~n", [Guids]),
+ GuidSet = gb_sets:from_list(Guids),
+ ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
+ gb_sets:fold(fun (Guid, State0) ->
+ confirm_message_internal(Guid, State0)
+ end, State, ToConfirmMsgs),
+ noreply(State#q{msgs_on_disk =
+ gb_sets:difference(MOD, ToConfirmMsgs),
+ msg_indices_on_disk =
+ gb_sets:difference(gb_sets:union(MIOD, GuidSet),
+ ToConfirmMsgs)});
+
handle_cast({reject, AckTags, Requeue, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 09eb7cfda2..9e917fe599 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -634,7 +634,6 @@ handle_cast({write, Pid, Guid, Msg},
%% New message, lots to do
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
- rabbit_log:info("message ~p written to disk~n", [Guid]),
ok = index_insert(#msg_location {
guid = Guid, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize },
@@ -826,7 +825,6 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
pid_to_fun = PTF,
pid_to_guids = PTG }) ->
- rabbit_log:info("msg_store syncing~ncallbacks: ~p~nguids: ~p~n", [dict:to_list(PTF), dict:to_list(PTG)]),
State1 = stop_sync_timer(State),
State2 = case Syncs of
[] -> State1;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 2b0919a1f3..454193c22f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -369,6 +369,7 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(QueueName, IsDurable, Recover) ->
+ Self = self(),
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -377,7 +378,7 @@ init(QueueName, IsDurable, Recover) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
fun (Guids) ->
- rabbit_log:info("message indices ~p commited to disk~n", [Guids])
+ gen_server2:cast(Self, {msg_indices_written_to_disk, Guids})
end),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -401,7 +402,7 @@ init(QueueName, IsDurable, Recover) ->
true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
false -> undefined
end,
- register_puback_callback(?PERSISTENT_MSG_STORE),
+ register_confirm_callback(?PERSISTENT_MSG_STORE),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
@@ -1072,24 +1073,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
msg_store_clients = MSCState1 }}.
%%----------------------------------------------------------------------------
-%% Internal gubbins for pubacks
+%% Internal gubbins for confirms
%%----------------------------------------------------------------------------
-register_puback_callback(MessageStore) ->
+register_confirm_callback(MessageStore) ->
+ Self = self(),
rabbit_msg_store:register_callback(
MessageStore,
fun (Guids) ->
- spawn(fun () -> ok = rabbit_misc:with_exit_handler(
- fun () ->
- rabbit_log:info("something bad happened while sending pubacks back to channel"),
- ok
- end,
- fun () ->
- lists:foreach(fun(G) ->
- rabbit_log:info("send puback back to channel for ~p~n", [G]) end, Guids),
- ok
- end)
- end)
+ gen_server2:cast(Self, {msgs_written_to_disk, Guids})
end).
%%----------------------------------------------------------------------------