diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-21 22:38:50 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-21 22:38:50 +0100 |
| commit | de20ca33aed6823d3d13b7dbdc30ec0869584269 (patch) | |
| tree | 4b6b6b5889922a27a1fa687c447ba277646b2f2d /src | |
| parent | 1a4427984277dff425b894972291ca3921eb7114 (diff) | |
| download | rabbitmq-server-git-de20ca33aed6823d3d13b7dbdc30ec0869584269.tar.gz | |
persistence of message props working for invariable queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 21 |
2 files changed, 18 insertions, 16 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index d317b55c68..152d2a8766 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -104,7 +104,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), Q1 = enqueue(Msg, MsgProps, false, Q), State #iv_state { queue = Q1, len = Len + 1 }. @@ -114,7 +114,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. @@ -149,7 +149,7 @@ tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), State. tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, @@ -263,14 +263,15 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }) -> + is_persistent = true }, MsgProps) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg) -> + [{publish, Msg1, MsgProps, + {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. persist_delivery(QName, true, false, #basic_message { is_persistent = true, diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 66e5cf6311..dad8187335 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -173,9 +173,9 @@ handle_call(force_snapshot, _From, State) -> handle_call({queue_content, QName}, _From, State = #pstate{snapshot = #psnapshot{messages = Messages, queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), D} || - {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], [{{'$4', '$1', '$2', '$3'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || + {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -243,9 +243,9 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun (M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; + [_] -> {tied, MsgProps, QK}; [] -> ets:insert(Messages, {PKey, Message}), M end; @@ -356,7 +356,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, + _MsgProps, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues), prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), @@ -474,14 +475,14 @@ perform_work(MessageList, Messages, Queues, SeqId) -> perform_work_item(Item, Messages, Queues, NextSeqId) end, SeqId, MessageList). -perform_work_item({publish, Message, QK = {_QName, PKey}}, +perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, Messages, Queues, NextSeqId) -> true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, NextSeqId}), + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; -perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, NextSeqId}), +perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> |
