summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-21 22:38:50 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-21 22:38:50 +0100
commitde20ca33aed6823d3d13b7dbdc30ec0869584269 (patch)
tree4b6b6b5889922a27a1fa687c447ba277646b2f2d /src
parent1a4427984277dff425b894972291ca3921eb7114 (diff)
downloadrabbitmq-server-git-de20ca33aed6823d3d13b7dbdc30ec0869584269.tar.gz
persistence of message props working for invariable queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_invariable_queue.erl13
-rw-r--r--src/rabbit_persister.erl21
2 files changed, 18 insertions, 16 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index d317b55c68..152d2a8766 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -104,7 +104,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q,
qname = QName,
durable = IsDurable,
len = Len }) ->
- ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
Q1 = enqueue(Msg, MsgProps, false, Q),
State #iv_state { queue = Q1, len = Len + 1 }.
@@ -114,7 +114,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid },
MsgProps,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
- ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
ok = persist_delivery(QName, IsDurable, false, Msg),
{Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
@@ -149,7 +149,7 @@ tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- ok = persist_message(QName, IsDurable, Txn, Msg),
+ ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps),
State.
tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
@@ -263,14 +263,15 @@ do_if_persistent(F, Txn, QName) ->
%%----------------------------------------------------------------------------
persist_message(QName, true, Txn, Msg = #basic_message {
- is_persistent = true }) ->
+ is_persistent = true }, MsgProps) ->
Msg1 = Msg #basic_message {
%% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
- [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
-persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ [{publish, Msg1, MsgProps,
+ {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) ->
ok.
persist_delivery(QName, true, false, #basic_message { is_persistent = true,
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 66e5cf6311..dad8187335 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -173,9 +173,9 @@ handle_call(force_snapshot, _From, State) ->
handle_call({queue_content, QName}, _From,
State = #pstate{snapshot = #psnapshot{messages = Messages,
queues = Queues}}) ->
- MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
- do_reply([{ets:lookup_element(Messages, K, 2), D} ||
- {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], [{{'$4', '$1', '$2', '$3'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), MP, D} ||
+ {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))],
State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -243,9 +243,9 @@ log_work(CreateWorkUnit, MessageList,
snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
- fun (M = {publish, Message, QK = {_QName, PKey}}) ->
+ fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) ->
case ets:lookup(Messages, PKey) of
- [_] -> {tied, QK};
+ [_] -> {tied, MsgProps, QK};
[] -> ets:insert(Messages, {PKey, Message}),
M
end;
@@ -356,7 +356,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered,
+ _MsgProps, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues),
prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
@@ -474,14 +475,14 @@ perform_work(MessageList, Messages, Queues, SeqId) ->
perform_work_item(Item, Messages, Queues, NextSeqId)
end, SeqId, MessageList).
-perform_work_item({publish, Message, QK = {_QName, PKey}},
+perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}},
Messages, Queues, NextSeqId) ->
true = ets:insert(Messages, {PKey, Message}),
- true = ets:insert(Queues, {QK, false, NextSeqId}),
+ true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
NextSeqId + 1;
-perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:insert(Queues, {QK, false, NextSeqId}),
+perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
NextSeqId + 1;
perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->