summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-05 15:18:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-05 15:18:45 +0100
commit792d203882a9911a83c1baa396fec71d8f730670 (patch)
tree731418b65fc93effa6dd095cdf4f4504b6e3cda5
parent519e73a0ad74c25ca8169322a1cfe5f42b2f7ab9 (diff)
downloadrabbitmq-server-git-792d203882a9911a83c1baa396fec71d8f730670.tar.gz
Removed some transactions and made all transaction bodies idempotent. They were actually fine before: a) the rabbit_disk_queue table is local_content and b) only one process ever accesses that table - thus there is no reason why any transaction will ever retry. However, this change is probably still beneficial. The only slight loss is that tx-commit is no longer atomic (ref counting of messages in ets, not mnesia, was resulting in non idempotency, so moved outside the transaction). This means that you could have msgs in a tx committed, but the acks not enforced, in the event of power failure or other catastrophic event.
All tests pass.
-rw-r--r--src/rabbit_disk_queue.erl130
1 files changed, 56 insertions, 74 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index fe8c433c7b..75892f68db 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -980,11 +980,6 @@ internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-%% called from ack with MnesiaDelete = true
-%% called from tx_commit with MnesiaDelete = txn
-%% called from tx_cancel with MnesiaDelete = false
-%% called from purge with MnesiaDelete = txn
-%% called from delete_queue with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
@@ -1092,8 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId, State1}} =
- mnesia:transaction(
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
@@ -1107,9 +1102,9 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
- {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {WriteSeqId1, State2}
+ WriteSeqId1
end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, true, State),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
@@ -1162,17 +1157,18 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% as they have no concept of sequence id anyway).
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId1, Q, State}} =
- mnesia:transaction(
+ {WriteSeqId1, Q, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State},
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []},
MsgSeqIds)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}.
-requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
+requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
ok = mnesia:write(rabbit_disk_queue,
@@ -1181,57 +1177,50 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- decrement_cache(MsgId, State),
- {WriteSeqId + 1, Q, State}.
+ {WriteSeqId + 1, Q, [MsgId | Acc]}.
%% move the next N messages from the front of the queue to the back.
internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
true ->
- {atomic, {ReadSeqIdN, WriteSeqIdN}} =
- mnesia:transaction(
+ {ReadSeqIdN, WriteSeqIdN, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId)
+ requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, [])
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}
end.
-requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) ->
- {ReadSeq, WriteSeq};
-requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) ->
+requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
+ {ReadSeq, WriteSeq, Acc};
+requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) ->
[Obj = #dq_msg_loc { msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
- decrement_cache(MsgId, State),
- requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1).
+ requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]).
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, 0, State};
{ReadSeqId, WriteSeqId} ->
- {atomic, {ok, State1}} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- {MsgSeqIds, WriteSeqId} =
- rabbit_misc:unfold(
- fun (SeqId) when SeqId == WriteSeqId -> false;
- (SeqId) ->
- [#dq_msg_loc { msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue,
- {Q, SeqId}, write),
- {true, {MsgId, SeqId}, SeqId + 1}
- end, ReadSeqId),
- remove_messages(Q, MsgSeqIds, txn, State)
- end),
+ {MsgSeqIds, WriteSeqId} =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId }] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ {true, {MsgId, SeqId}, SeqId + 1}
+ end, ReadSeqId),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, State1} = remove_messages(Q, MsgSeqIds, true, State),
{ok, WriteSeqId - ReadSeqId, State1}
end.
@@ -1239,26 +1228,19 @@ internal_delete_queue(Q, State) ->
{ok, _Count, State1 = #dqstate { sequences = Sequences }} =
internal_purge(Q, State), %% remove everything undelivered
true = ets:delete(Sequences, Q),
- {atomic, {ok, State2}} =
- mnesia:transaction(
- fun() -> %% now remove everything already delivered
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- Objs =
- mnesia:match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_'
- },
- write),
- MsgSeqIds =
- lists:map(
- fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, txn, State1)
- end),
- {ok, State2}.
+ %% now remove everything already delivered
+ Objs = mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_'
+ }),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
+ remove_messages(Q, MsgSeqIds, true, State1).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
@@ -1563,8 +1545,8 @@ load_from_disk(State) ->
State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
- {atomic, State2} =
- mnesia:transaction(
+ State2 =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{State6, FinalQ, MsgSeqIds2, _Len} =
@@ -1605,7 +1587,7 @@ load_from_disk(State) ->
{ok, State8}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- {atomic, true} = mnesia:transaction(
+ true = rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
@@ -1624,7 +1606,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
end
end, true, rabbit_disk_queue)
end),
- remove_gaps_in_sequences(State),
+ ok = remove_gaps_in_sequences(State),
State.
remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
@@ -1637,18 +1619,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
%% we could shuffle downwards. However, I think there's greater
%% likelihood of gaps being at the bottom rather than the top of
%% the queue, so shuffling up should be the better bet.
- {atomic, _} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- ReadSeqId1 = ReadSeqId + Gap,
- true = ets:insert(Sequences,
- {Q, ReadSeqId1, WriteSeqId})
- end, ets:match_object(Sequences, '_'))
- end).
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ ReadSeqId1 = ReadSeqId + Gap,
+ true = ets:insert(Sequences,
+ {Q, ReadSeqId1, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end),
+ ok.
shuffle_up(_Q, SeqId, SeqId, Gap) ->
Gap;