diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-21 17:37:43 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-21 17:37:43 +0100 |
| commit | f56898059fe9ecd900d4d292e656c196b85284ef (patch) | |
| tree | f2493765e2a7d77907d4817309d646ef95afa51e | |
| parent | b6356f82afd07d7c5533d6184d9a26672ab0dc9f (diff) | |
| download | rabbitmq-server-git-f56898059fe9ecd900d4d292e656c196b85284ef.tar.gz | |
stylistic issues, reworking handling of msg_properties
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f895385210..1aa1d05f4e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -413,32 +413,31 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, MsgProps, - State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, MsgProps, BQS), + BQ:publish_delivered(AckRequired, Message, + #msg_properties{}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, MsgProps, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, MsgProps, BQS)}}. + BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - MsgProps = new_msg_properties(State), - case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers BQS = BQ:publish(Message, - MsgProps, + msg_properties(State), State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -591,7 +590,7 @@ reset_msg_expiry_fun(State) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} end. -new_msg_properties(State) -> +msg_properties(State) -> #msg_properties{expiry = calculate_msg_expiry(State)}. calculate_msg_expiry(_State = #q{ttl = undefined}) -> @@ -719,9 +718,10 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, - new_msg_properties(State), - State), + + %% we don't need an expiry here because messages are not being + %% enqueued, so we use an empty msg_properties. + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> |
