summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl22
-rw-r--r--src/rabbit_invariable_queue.erl10
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 51ea4825ae..1df0c054cd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -413,32 +413,32 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(none, _ChPid, Message, MsgProps,
+ State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, BQS),
+ BQ:publish_delivered(AckRequired, Message, MsgProps, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+attempt_delivery(Txn, ChPid, Message, MsgProps,
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- MsgProperties = new_msg_properties(State),
{true, State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}.
+ BQ:tx_publish(Txn, Message, MsgProps, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
+ MsgProps = new_msg_properties(State),
+ case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- MsgProperties = new_msg_properties(State),
BQS = BQ:publish(Message,
- MsgProperties,
+ MsgProps,
State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -718,7 +718,9 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
+ {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message,
+ new_msg_properties(State),
+ State),
reply(Delivered, NewState);
handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 2847136141..2993b325ce 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -176,8 +176,8 @@ tx_commit(Txn, Fun, MsgPropsFun,
PA1 = remove_acks(AckTags1, PA),
{Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
MsgProps1 = MsgPropsFun(MsgProps),
- QN = enqueue(Msg, MsgProps1, false, Q),
- {QN, LenN + 1}
+ QM = enqueue(Msg, MsgProps1, false, QN),
+ {QM, LenN + 1}
end, {Q, Len}, PubsRev),
{AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
@@ -203,8 +203,7 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q,
State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
enqueue(Msg, MsgProps, IsDelivered, Q) ->
- I = {Msg, MsgProps, IsDelivered},
- queue:in(I, Q).
+ queue:in({Msg, MsgProps, IsDelivered}, Q).
len(#iv_state { len = Len }) -> Len.
@@ -280,7 +279,8 @@ persist_acks(QName, true, Txn, AckTags, PA) ->
persist_work(Txn, QName,
[{ack, {QName, Guid}} || Guid <- AckTags,
begin
- {ok, Msg} = dict:find(Guid, PA),
+ {ok, {Msg, _MsgProps}}
+ = dict:find(Guid, PA),
Msg #basic_message.is_persistent
end]);
persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->