diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-01-09 12:27:09 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-01-09 12:27:09 +0000 |
| commit | 43ebab39dd210852a71fbc9b2a6ef0ab89fa3bcd (patch) | |
| tree | b6f08416af89c65dedab77446a1b268098f30a2c /src | |
| parent | 7d1ad8a494c051d2629c8931a3079d801c9173b2 (diff) | |
| parent | 6d9ab0f01da5f12ac82dad14553f07dc849df582 (diff) | |
| download | rabbitmq-server-git-43ebab39dd210852a71fbc9b2a6ef0ab89fa3bcd.tar.gz | |
Merged default into bug19375
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 |
2 files changed, 45 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94150f1c69..c974e528ea 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -405,7 +405,8 @@ args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}]. + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}]. check_string_arg({longstr, _}, _Args) -> ok; check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. @@ -416,6 +417,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 536ed48a99..0753d98c80 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + max_length }). -record(consumer, {tag, ack_required}). @@ -240,7 +241,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -252,6 +254,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -548,10 +552,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + {false, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_drop_head(State2), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3#q{backing_queue_state = BQS1}) + end. + +maybe_drop_head(State = #q{max_length = undefined}) -> + State; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) >= MaxLen of + true -> + with_dlx(State#q.dlx, + fun (X) -> + {ok, State1} = dead_letter_maxlen_msgs(X, State), + State1 + end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end); + false -> + State end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, @@ -741,6 +767,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) -> + dead_letter_msgs(fun (DLFun, Acc, BQS1) -> + {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), + {ok, DLFun(Msg, AckTag, Acc), BQS2} + end, maxlen, X, State). + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, |
