summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_variable_queue.erl9
2 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2cee51f70d..abd09a6343 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -63,7 +63,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- ttl
+ ttl
}).
-record(consumer, {tag, ack_required}).
@@ -406,7 +406,7 @@ deliver_from_queue_pred(IsEmpty, _State) ->
deliver_from_queue_deliver(AckRequired, false,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
+ {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
@@ -440,7 +440,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- MsgProperties = new_msg_properties(State),
+ MsgProperties = new_msg_properties(State),
BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -577,7 +577,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) ->
undefined;
calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
Now = timer:now_diff(now(), {0,0,0}),
- Now - (Ttl * 1000).
+ Now + (Ttl * 1000).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -621,6 +621,24 @@ emit_stats(State) ->
[{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
%---------------------------------------------------------------------------
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case msg_expired(MsgProperties) of
+ true ->
+ fetch(AckRequired, State#q{backing_queue_state = BQS1});
+ false ->
+ {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}}
+ end
+ end.
+
+msg_expired(MsgProperties = #msg_properties{expiry = undefined}) ->
+ false;
+msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now > Expiry.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -699,13 +717,12 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- backing_queue_state = BQS, backing_queue = BQ}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case fetch(AckRequired, State1) of
+ {empty, State2} -> reply(empty, State2);
+ {{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(
@@ -713,7 +730,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e0bc75b7e6..591052a601 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -249,7 +249,7 @@
is_delivered,
msg_on_disk,
index_on_disk,
- msg_properties
+ msg_properties
}).
-record(delta,
@@ -533,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
{{value, MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk,
+ msg_properties = MsgProperties }},
Q4a} ->
%% 1. Mark it delivered if necessary
@@ -564,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, MsgProperties, IsDelivered, AckTag, Len1},
a(State #vqstate { q4 = Q4a,
ram_msg_count = RamMsgCount - 1,
out_counter = OutCount + 1,
@@ -580,7 +581,7 @@ ack(AckTags, State) ->
AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
- MsgProperties,
+ MsgProperties,
State = #vqstate { durable = IsDurable,
msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),