summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-27 08:59:06 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-27 08:59:06 +0100
commite49f833515b536a51af2a1881fc095348d620c96 (patch)
treefad3d1b55d5215a5a0a6ae762a1cdfc47e143dda
parent8705e92302e760cdd160b995f2028d20b65035e2 (diff)
downloadrabbitmq-server-git-e49f833515b536a51af2a1881fc095348d620c96.tar.gz
solved a couple of bad_matches/function_clauses
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_variable_queue.erl4
3 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8a9a293bad..eb34aeff47 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -417,8 +417,8 @@ maybe_record_confirm_message(undefined, _, _, State) ->
maybe_record_confirm_message(MsgSeqNo,
#basic_message { guid = Guid },
ChPid, State) ->
- State #q { guid_to_channel =
- dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }.
+ State #q { guid_to_channel =
+ dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }.
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -549,8 +549,9 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
case Fun(BQS) of
{BQS1, {confirm, Guids}} ->
- confirm_messages_internal(Guids,
- State #q { backing_queue_state = BQS1 });
+ run_message_queue(
+ confirm_messages_internal(Guids,
+ State #q { backing_queue_state = BQS1 }));
BQS1 ->
run_message_queue(State#q{backing_queue_state = BQS1})
end.
@@ -868,10 +869,11 @@ handle_cast({ack, Txn, AckTags, ChPid},
none ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- {NewBQS, AckdGuids} = BQ:ack(AckTags, BQS),
+ {NewBQS, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS),
NewState =
confirm_messages_internal(AckdGuids,
- State #q { backing_queue_state = NewBQS }),
+ State #q { backing_queue_state =
+ NewBQS }),
{NewC, NewState};
_ ->
{C#cr{txn = Txn},
@@ -891,9 +893,10 @@ handle_cast({reject, AckTags, Requeue, ChPid},
store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> {BQS1, AckdGuids} = BQ:ack(AckTags, BQS),
- confirm_messages_internal(AckdGuids,
- State #q { backing_queue_state = BQS1 })
+ false -> {BQS1, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS),
+ confirm_messages_internal(
+ AckdGuids,
+ State #q { backing_queue_state = BQS1 })
end)
end;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 208f71f030..9e38a9766d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -376,6 +376,7 @@ client_init(Server, Ref) ->
client_ref = Ref}.
client_terminate(CState, Server) ->
+ close_all_handles(CState),
ok = gen_server2:call(Server, {client_terminate, CState}, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
@@ -637,7 +638,6 @@ handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }},
_From,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
- ok = close_all_handles(CState),
reply(ok,
State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
cref_to_guids = dict:erase(CRef, CTG) }).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6521c54496..8b4f55c5e1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1129,7 +1129,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {State, []};
+ {State, {confirm, []}};
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -1153,7 +1153,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
end,
{State2 #vqstate { index_state = IndexState1,
persistent_count = PCount1 },
- AckdGuids}.
+ {confirm, AckdGuids}}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,