diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-17 11:04:39 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-17 11:04:39 +0100 |
| commit | 7fbc158b768ce687571221adae4d7e08f09f8b56 (patch) | |
| tree | 8a802344bcfbe11b4059deeb809ed6163f64bf24 | |
| parent | ee30296b7789b4aadc5c62d2d8ad6bd9877d0d04 (diff) | |
| download | rabbitmq-server-git-7fbc158b768ce687571221adae4d7e08f09f8b56.tar.gz | |
some cosmetic changes and the beginnings of invariable_queue with new api
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 |
3 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8d5699bee2..1ea9d18154 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -402,9 +402,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad8422..44489bf939 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, @@ -89,9 +89,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, + Acc) -> Acc; - ({Msg = #basic_message { guid = Guid }, IsDelivered}, + ({Msg = #basic_message { guid = Guid }, _MsgProps, IsDelivered}, {AckTagsN, PAN}) -> ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} @@ -99,10 +100,13 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), - State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + QueueItem = {Msg, MsgProps, false}, + State #iv_state { queue = queue:in(QueueItem, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; @@ -118,8 +122,8 @@ fetch(_AckRequired, State = #iv_state { len = 0 }) -> fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, durable = IsDurable, pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = - queue:out(Q), + {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, + Q1} = queue:out(Q), Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), @@ -129,7 +133,7 @@ fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, [Guid], PA1), {blank_ack, PA} end, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProps, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 805e89f839..fdead8f9e0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,7 +1414,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), - passed = test_queue_index_props(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, |
