summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-17 11:04:39 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-17 11:04:39 +0100
commit7fbc158b768ce687571221adae4d7e08f09f8b56 (patch)
tree8a802344bcfbe11b4059deeb809ed6163f64bf24
parentee30296b7789b4aadc5c62d2d8ad6bd9877d0d04 (diff)
downloadrabbitmq-server-git-7fbc158b768ce687571221adae4d7e08f09f8b56.tar.gz
some cosmetic changes and the beginnings of invariable_queue with new api
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_invariable_queue.erl22
-rw-r--r--src/rabbit_tests.erl2
3 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8d5699bee2..1ea9d18154 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -402,9 +402,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, false,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 4e0dad8422..44489bf939 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_invariable_queue).
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
@@ -89,9 +89,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
%% We do not purge messages pending acks.
{AckTags, PA} =
rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered},
+ Acc) ->
Acc;
- ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ ({Msg = #basic_message { guid = Guid }, _MsgProps, IsDelivered},
{AckTagsN, PAN}) ->
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
{[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
@@ -99,10 +100,13 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
+publish(Msg, MsgProps, State = #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg),
- State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+ QueueItem = {Msg, MsgProps, false},
+ State #iv_state { queue = queue:in(QueueItem, Q), len = Len + 1 }.
publish_delivered(false, _Msg, State) ->
{blank_ack, State};
@@ -118,8 +122,8 @@ fetch(_AckRequired, State = #iv_state { len = 0 }) ->
fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
durable = IsDurable,
pending_ack = PA }) ->
- {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
- queue:out(Q),
+ {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}},
+ Q1} = queue:out(Q),
Len1 = Len - 1,
ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
PA1 = dict:store(Guid, Msg, PA),
@@ -129,7 +133,7 @@ fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
[Guid], PA1),
{blank_ack, PA}
end,
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, MsgProps, IsDelivered, AckTag, Len1},
State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 805e89f839..fdead8f9e0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1414,7 +1414,7 @@ test_backing_queue() ->
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit, infinity),
passed = test_queue_index(),
- passed = test_queue_index_props(),
+ passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,