diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
2 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cee51f70d..abd09a6343 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -63,7 +63,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - ttl + ttl }). -record(consumer, {tag, ack_required}). @@ -406,7 +406,7 @@ deliver_from_queue_pred(IsEmpty, _State) -> deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. @@ -440,7 +440,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - MsgProperties = new_msg_properties(State), + MsgProperties = new_msg_properties(State), BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -577,7 +577,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), - Now - (Ttl * 1000). + Now + (Ttl * 1000). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -621,6 +621,24 @@ emit_stats(State) -> [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). %--------------------------------------------------------------------------- +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. + +msg_expired(MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -699,13 +717,12 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, State1) of + {empty, State2} -> reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( @@ -713,7 +730,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e0bc75b7e6..591052a601 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -249,7 +249,7 @@ is_delivered, msg_on_disk, index_on_disk, - msg_properties + msg_properties }). -record(delta, @@ -533,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, + msg_properties = MsgProperties }}, Q4a} -> %% 1. Mark it delivered if necessary @@ -564,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, a(State #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, @@ -580,7 +581,7 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), |
