summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-21 17:37:43 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-21 17:37:43 +0100
commitf56898059fe9ecd900d4d292e656c196b85284ef (patch)
treef2493765e2a7d77907d4817309d646ef95afa51e
parentb6356f82afd07d7c5533d6184d9a26672ab0dc9f (diff)
downloadrabbitmq-server-git-f56898059fe9ecd900d4d292e656c196b85284ef.tar.gz
stylistic issues, reworking handling of msg_properties
-rw-r--r--src/rabbit_amqqueue_process.erl26
1 files changed, 13 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f895385210..1aa1d05f4e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -413,32 +413,31 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, MsgProps,
- State = #q{backing_queue = BQ}) ->
+attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, MsgProps, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ #msg_properties{}, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, MsgProps,
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, MsgProps, BQS)}}.
+ BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- MsgProps = new_msg_properties(State),
- case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of
+ case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
BQS = BQ:publish(Message,
- MsgProps,
+ msg_properties(State),
State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -591,7 +590,7 @@ reset_msg_expiry_fun(State) ->
MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
end.
-new_msg_properties(State) ->
+msg_properties(State) ->
#msg_properties{expiry = calculate_msg_expiry(State)}.
calculate_msg_expiry(_State = #q{ttl = undefined}) ->
@@ -719,9 +718,10 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message,
- new_msg_properties(State),
- State),
+
+ %% we don't need an expiry here because messages are not being
+ %% enqueued, so we use an empty msg_properties.
+ {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->