summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-19 13:22:38 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-19 13:22:38 +0100
commit003b4167fef45b15bf8a123aa07ecda0b1660500 (patch)
treef882a24d4c228c2e3b1f7b94a61cf3cd9a6600bb
parentb81f6ccf7329f5c3944bc8e63b4cfc3135db5112 (diff)
downloadrabbitmq-server-git-003b4167fef45b15bf8a123aa07ecda0b1660500.tar.gz
Some initial reworkings, making sure all messages have a guid, and moving the persister flag around slightly. Also various weird comments appearing in _process.erl for me!
-rw-r--r--include/rabbit.hrl5
-rw-r--r--src/rabbit_amqqueue_process.erl44
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_exchange.erl4
4 files changed, 30 insertions, 30 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 44e1368460..6212d4f327 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,7 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}).
-record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}).
@@ -134,7 +134,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: bool()}).
-type(message() :: basic_message()).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c390b2b7e4..69edb64fbc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -203,15 +203,15 @@ attempt_delivery(none, Message, State) ->
{offered, false, State1} ->
{true, State1};
{offered, true, State1} ->
- persist_message(none, qname(State), Message),
- persist_delivery(qname(State), Message, false),
+ persist_message(none, qname(State), Message), %% DQ HERE
+ persist_delivery(qname(State), Message, false), %% DQ HERE
{true, State1};
{not_offered, State1} ->
{false, State1}
end;
attempt_delivery(Txn, Message, State) ->
- persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, Message),
+ persist_message(Txn, qname(State), Message), %% DQ tx_commit and store msgid in txn map
+ record_pending_message(Txn, Message), %% DQ seems to be done here!
{true, State}.
deliver_or_enqueue(Txn, Message, State) ->
@@ -219,8 +219,8 @@ deliver_or_enqueue(Txn, Message, State) ->
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message),
- NewMB = queue:in({Message, false}, NewState#q.message_buffer),
+ persist_message(Txn, qname(State), Message), %% DQ Txn must be false here
+ NewMB = queue:in({Message, false}, NewState#q.message_buffer), %% DQ magic here
{false, NewState#q{message_buffer = NewMB}}
end.
@@ -302,7 +302,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
case check_auto_delete(
deliver_or_enqueue_n(
[{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
+ {_Messsage_id, Message} <- dict:to_list(UAM)], %% DQ alter all this stuff?
State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -343,10 +343,10 @@ run_poke_burst(MessageBuffer, State) ->
{{value, {Message, Delivered}}, BufferTail} ->
case deliver_immediately(Message, Delivered, State) of
{offered, true, NewState} ->
- persist_delivery(qname(State), Message, Delivered),
+ persist_delivery(qname(State), Message, Delivered), %% DQ ack needed
run_poke_burst(BufferTail, NewState);
{offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
+ persist_auto_ack(qname(State), Message), %% DQ record? We don't persist acks anyway now...
run_poke_burst(BufferTail, NewState);
{not_offered, NewState} ->
NewState#q{message_buffer = MessageBuffer}
@@ -371,7 +371,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
+persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> %% DQ
ok;
persist_message(Txn, QName, Message) ->
M = Message#basic_message{
@@ -379,29 +379,29 @@ persist_message(Txn, QName, Message) ->
content = rabbit_binary_parser:clear_decoded_content(
Message#basic_message.content)},
persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
+ [{publish, M, {QName, M#basic_message.guid}}]).
-persist_delivery(_QName, _Message,
+persist_delivery(_QName, _Message, %% DQ
true) ->
ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
+persist_delivery(_QName, #basic_message{is_persistent = false}, %% DQ
_Delivered) ->
ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
+persist_delivery(QName, #basic_message{guid = MsgId}, %% DQ
_Delivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
+ persist_work(none, QName, [{deliver, {QName, MsgId}}]).
-persist_acks(Txn, QName, Messages) ->
+persist_acks(Txn, QName, Messages) -> %% DQ
persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
+ [{ack, {QName, MsgId}} ||
+ #basic_message{guid = MsgId, is_persistent = P} <- Messages,
+ P]).
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
+persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
+persist_auto_ack(QName, #basic_message{is_persistent = true, guid = MsgId}) ->
%% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
+ rabbit_persister:dirty_work([{ack, {QName, MsgId}}]).
persist_work(_Txn,_QName, []) ->
ok;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7574cd673a..aeb15bd1b9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -313,15 +313,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
{noreply, publish(Mandatory, Immediate,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent)},
rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a57e8076bf..9b3bbb1851 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -204,7 +204,9 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
content = Content,
- persistent_key = none},
+ is_persistent = false,
+ guid = rabbit_guid:guid()
+ },
simple_publish(Mandatory, Immediate, Message).
%% Usable by Erlang code that wants to publish messages.