diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 10 |
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 51ea4825ae..1df0c054cd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -413,32 +413,32 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, MsgProps, + State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, MsgProps, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, MsgProps, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - MsgProperties = new_msg_properties(State), {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. + BQ:tx_publish(Txn, Message, MsgProps, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of + MsgProps = new_msg_properties(State), + case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - MsgProperties = new_msg_properties(State), BQS = BQ:publish(Message, - MsgProperties, + MsgProps, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -718,7 +718,9 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, + new_msg_properties(State), + State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 2847136141..2993b325ce 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -176,8 +176,8 @@ tx_commit(Txn, Fun, MsgPropsFun, PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> MsgProps1 = MsgPropsFun(MsgProps), - QN = enqueue(Msg, MsgProps1, false, Q), - {QN, LenN + 1} + QM = enqueue(Msg, MsgProps1, false, QN), + {QM, LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. @@ -203,8 +203,7 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. enqueue(Msg, MsgProps, IsDelivered, Q) -> - I = {Msg, MsgProps, IsDelivered}, - queue:in(I, Q). + queue:in({Msg, MsgProps, IsDelivered}, Q). len(#iv_state { len = Len }) -> Len. @@ -280,7 +279,8 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, Msg} = dict:find(Guid, PA), + {ok, {Msg, _MsgProps}} + = dict:find(Guid, PA), Msg #basic_message.is_persistent end]); persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> |
