diff options
| -rw-r--r-- | packaging/windows-exe/rabbitmq_nsi.in | 22 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 283 |
9 files changed, 355 insertions, 42 deletions
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 7ad711ba38..7aec1f1dc4 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -4,6 +4,7 @@ !include WinMessages.nsh !include FileFunc.nsh !include WordFunc.nsh +!include x64.nsh !define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"' !define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ" @@ -19,8 +20,10 @@ OutFile "rabbitmq-server-%%VERSION%%.exe" ; Icons !define MUI_ICON "rabbitmq.ico" -; The default installation directory -InstallDir "$PROGRAMFILES\RabbitMQ Server" +; The default installation directory is empty. The .onInit function +; below takes care of selecting the appropriate (32-bit vs. 64-bit) +; "Program Files". +InstallDir "" ; Registry key to check for directory (so if you install again, it will ; overwrite the old one automatically) @@ -86,9 +89,9 @@ Section "RabbitMQ Server (required)" Rabbit WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR" ; Write the uninstall keys for Windows - WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server" + WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server %%VERSION%%" WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe" - WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0" + WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\rabbitmq.ico" WriteRegStr HKLM ${uninstall} "Publisher" "Pivotal Software, Inc." WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%" WriteRegDWORD HKLM ${uninstall} "NoModify" 1 @@ -168,6 +171,7 @@ Section "Uninstall" RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%" Delete "$INSTDIR\rabbitmq.ico" Delete "$INSTDIR\uninstall.exe" + RMDir "$INSTDIR" ; Remove start menu items RMDir /r "$SMPROGRAMS\RabbitMQ Server" @@ -182,6 +186,16 @@ SectionEnd ; Functions Function .onInit + ; By default, always install in "\Program Files", not matter if we run + ; on a 32-bit or 64-bit Windows. + ${If} $INSTDIR == ""; + ${If} ${RunningX64} + StrCpy $INSTDIR "$PROGRAMFILES64\RabbitMQ Server" + ${Else} + StrCpy $INSTDIR "$PROGRAMFILES\RabbitMQ Server" + ${EndIf} + ${EndIf} + Call findErlang ReadRegStr $0 HKLM ${uninstall} "UninstallString" diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 42e5332367..1e4ebd0f1e 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -36,6 +36,7 @@ OCF_RESKEY_definitions_dump_file_default="/etc/rabbitmq/definitions" OCF_RESKEY_pid_file_default="/var/run/rabbitmq/pid" OCF_RESKEY_log_dir_default="/var/log/rabbitmq" OCF_RESKEY_mnesia_base_default="/var/lib/rabbitmq/mnesia" +OCF_RESKEY_host_ip_default="127.0.0.1" OCF_RESKEY_node_port_default=5672 OCF_RESKEY_erlang_cookie_default=false OCF_RESKEY_erlang_cookie_file_default="/var/lib/rabbitmq/.erlang.cookie" @@ -217,6 +218,14 @@ Base directory for storing Mnesia files <content type="boolean" default="${OCF_RESKEY_mnesia_base_default}" /> </parameter> +<parameter name="host_ip" unique="0" required="0"> +<longdesc lang="en"> +${OCF_RESKEY_binary} should listen on this IP address +</longdesc> +<shortdesc lang="en">${OCF_RESKEY_binary} should listen on this IP address</shortdesc> +<content type="boolean" default="${OCF_RESKEY_host_ip_default}" /> +</parameter> + <parameter name="node_port" unique="0" required="0"> <longdesc lang="en"> ${OCF_RESKEY_binary} should listen on this port @@ -1607,7 +1616,7 @@ action_notify() { ocf_log info "${LH} post-start end." if [ -s "${OCF_RESKEY_definitions_dump_file}" ] ; then ocf_log info "File ${OCF_RESKEY_definitions_dump_file} exists" - ocf_run curl -X POST -u $OCF_RESKEY_admin_user:$OCF_RESKEY_admin_password 127.0.0.1:15672/api/definitions --header "Content-Type:application/json" -d @$OCF_RESKEY_definitions_dump_file + ocf_run curl -X POST -u $OCF_RESKEY_admin_user:$OCF_RESKEY_admin_password $OCF_RESKEY_host_ip:15672/api/definitions --header "Content-Type:application/json" -d @$OCF_RESKEY_definitions_dump_file rc=$? if [ $rc -eq $OCF_SUCCESS ] ; then ocf_log info "RMQ definitions have imported succesfully." diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 452047fdb2..ee331180ed 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -318,7 +318,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, - {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -361,6 +362,13 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_queue_mode(undefined, State) -> + State; +init_queue_mode(Mode, State = #q {backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS), + State#q{backing_queue_state = BQS1}. + reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2799d510d0..5d061252d0 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -769,8 +769,20 @@ call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> true -> lists:map(fun list_to_binary_utf8/1, Args); false -> Args end, - spawn_link(rabbit_cli, rpc_call, [Node, Mod, Fun, Args0, Ref = make_ref(), - Pid = self(), Timeout]), + Ref = make_ref(), + Pid = self(), + spawn_link( + fun () -> + case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, + Ref, Pid, Timeout) of + {error, _} = Error -> + Pid ! {error, Error}; + {bad_argument, _} = Error -> + Pid ! {error, Error}; + _ -> + ok + end + end), rabbit_control_misc:wait_for_info_messages( Pid, Ref, InfoKeys, fun display_info_message/2, Timeout). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ee3a097a80..e63ee107b0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -23,7 +23,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, info/2, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). -export([start/1, stop/0, delete_crashed/1]). @@ -485,6 +485,13 @@ is_duplicate(Message = #basic_message { id = MsgId }, confirmed = [MsgId | Confirmed] }} end. +set_queue_mode(Mode, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_queue_mode, Mode}), + BQS1 = BQ:set_queue_mode(Mode, BQS), + State #state { backing_queue_state = BQS1 }. + %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5da91c70c5..225c21dd54 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -256,13 +256,10 @@ handle_cast({gm, Instruction}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. - case Flow of - %% We are acking messages to the channel process that sent us - %% the message delivery. See - %% rabbit_amqqueue_process:handle_ch_down for more info. - flow -> credit_flow:ack(Sender); - noflow -> ok - end, + %% We are acking messages to the channel process that sent us + %% the message delivery. See + %% rabbit_amqqueue_process:handle_ch_down for more info. + maybe_flow_ack(Sender, Flow), noreply(maybe_enqueue_message(Delivery, State)); handle_cast({sync_start, Ref, Syncer}, @@ -658,10 +655,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% need to send an ack for these messages since the channel is waiting %% for one for the via-GM case and we will not now receive one. promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> - case Flow of - flow -> credit_flow:ack(Sender); - noflow -> ok - end, + maybe_flow_ack(Sender, Flow), Delivery#delivery{mandatory = false}. noreply(State) -> @@ -948,10 +942,15 @@ process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(Reason, BQS), - {stop, State #state { backing_queue_state = undefined }}. + {stop, State #state { backing_queue_state = undefined }}; +process_instruction({set_queue_mode, Mode}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_queue_mode(Mode, BQS), + {ok, State #state { backing_queue_state = BQS1 }}. -maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); -maybe_flow_ack(_ChPid, noflow) -> ok. +maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender); +maybe_flow_ack(_Sender, noflow) -> ok. msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 65f3801e3e..a4e1e9be4a 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -35,7 +35,8 @@ register() -> {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, - {policy_validator, <<"max-length-bytes">>}]], + {policy_validator, <<"max-length-bytes">>}, + {policy_validator, <<"queue-mode">>}]], ok. validate_policy(Terms) -> @@ -83,4 +84,11 @@ validate_policy0(<<"max-length-bytes">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length-bytes">>, Value) -> - {error, "~p is not a valid maximum length in bytes", [Value]}. + {error, "~p is not a valid maximum length in bytes", [Value]}; + +validate_policy0(<<"queue-mode">>, <<"default">>) -> + ok; +validate_policy0(<<"queue-mode">>, <<"lazy">>) -> + ok; +validate_policy0(<<"queue-mode">>, Value) -> + {error, "~p is not a valid queue-mode value", [Value]}. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 4d638b334a..46a3991d88 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -40,7 +40,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). %% for rabbit_mirror_queue_sync. -export([partition_publish_delivered_batch/1]). @@ -430,6 +430,11 @@ is_duplicate(Msg, State = #state{bq = BQ}) -> is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(is_duplicate(Msg, BQS)). +set_queue_mode(Mode, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State); +set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(set_queue_mode(Mode, BQS)). + %%---------------------------------------------------------------------------- bq() -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d1f45ade4f..19878580db 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -25,7 +25,8 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2, + multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -302,7 +303,10 @@ disk_read_count, disk_write_count, - io_batch_size + io_batch_size, + + %% default queue or lazy queue + mode }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -399,7 +403,8 @@ disk_read_count :: non_neg_integer(), disk_write_count :: non_neg_integer(), - io_batch_size :: pos_integer()}). + io_batch_size :: pos_integer(), + mode :: 'default' | 'lazy' }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -667,7 +672,8 @@ ack(AckTags, State) -> a(State1 #vqstate { index_state = IndexState1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, #vqstate { delta = Delta, +requeue(AckTags, #vqstate { mode = default, + delta = Delta, q3 = Q3, q4 = Q4, in_counter = InCounter, @@ -687,6 +693,23 @@ requeue(AckTags, #vqstate { delta = Delta, q3 = Q3a, q4 = Q4a, in_counter = InCounter + MsgCount, + len = Len + MsgCount })))}; +requeue(AckTags, #vqstate { mode = lazy, + delta = Delta, + q3 = Q3, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], + delta_limit(Delta), + fun publish_beta/2, State), + {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, + State1), + MsgCount = length(MsgIds1), + {MsgIds1, a(reduce_memory_use( + maybe_update_rates( + State2 #vqstate { delta = Delta1, + q3 = Q3a, + in_counter = InCounter + MsgCount, len = Len + MsgCount })))}. ackfold(MsgFun, Acc, State, AckTags) -> @@ -852,6 +875,7 @@ info(disk_writes, #vqstate{disk_write_count = Count}) -> Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = Mode, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -860,7 +884,8 @@ info(backing_queue_status, #vqstate { ack_in = AvgAckIngressRate, ack_out = AvgAckEgressRate }}) -> - [ {q1 , ?QUEUE:len(Q1)}, + [ {mode , Mode}, + {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, {q3 , ?QUEUE:len(Q3)}, @@ -880,6 +905,51 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. +set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> + State; +set_queue_mode(lazy, State = #vqstate { + target_ram_count = TargetRamCount }) -> + %% To become a lazy queue we need to page everything to disk first. + State1 = convert_to_lazy(State), + %% restore the original target_ram_count + a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); +set_queue_mode(default, State) -> + %% becoming a default queue means loading messages from disk like + %% when a queue is recovered. + a(maybe_deltas_to_betas(State #vqstate { mode = default })); +set_queue_mode(_, State) -> + State. + +convert_to_lazy(State) -> + State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = + set_ram_duration_target(0, State), + case Delta#delta.count + ?QUEUE:len(Q3) == Len of + true -> + State1; + false -> + %% When pushing messages to disk, we might have been + %% blocked by the msg_store, so we need to see if we have + %% to wait for more credit, and then keep paging messages. + %% + %% The amqqueue_process could have taken care of this, but + %% between the time it receives the bump_credit msg and + %% calls BQ:resume to keep paging messages to disk, some + %% other request may arrive to the BQ which at this moment + %% is not in a proper state for a lazy BQ (unless all + %% messages have been paged to disk already). + wait_for_msg_store_credit(), + convert_to_lazy(State1) + end. + +wait_for_msg_store_credit() -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg) + end; + false -> ok + end. + %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -935,8 +1005,8 @@ get_collection_head(Col, IsEmpty, GetVal) -> %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- - a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = default, len = Len, bytes = Bytes, unacked_bytes = UnackedBytes, @@ -951,9 +1021,16 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E4 = ?QUEUE:is_empty(Q4), LZ = Len == 0, + %% if q1 has messages then q3 cannot be empty. See publish/6. true = E1 or not E3, + %% if q2 has messages then we have messages in delta (paged to + %% disk). See push_alphas_to_betas/2. true = E2 or not ED, + %% if delta has messages then q3 cannot be empty. This is enforced + %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages + %% are always kept on RAM. true = ED or not E3, + %% if the queue length is 0, then q3 and q4 must be empty. true = LZ == (E3 and E4), true = Len >= 0, @@ -966,6 +1043,53 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = RamBytes >= 0, true = RamBytes =< Bytes + UnackedBytes, + State; +a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = lazy, + len = Len, + bytes = Bytes, + unacked_bytes = UnackedBytes, + persistent_count = PersistentCount, + persistent_bytes = PersistentBytes, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes}) -> + E1 = ?QUEUE:is_empty(Q1), + E2 = ?QUEUE:is_empty(Q2), + ED = Delta#delta.count == 0, + E3 = ?QUEUE:is_empty(Q3), + E4 = ?QUEUE:is_empty(Q4), + LZ = Len == 0, + L3 = ?QUEUE:len(Q3), + + %% q1 must always be empty, since q1 only gets messages during + %% publish, but for lazy queues messages go straight to delta. + true = E1, + + %% q2 only gets messages from q1 when push_alphas_to_betas is + %% called for a non empty delta, which won't be the case for a + %% lazy queue. This means q2 must always be empty. + true = E2, + + %% q4 must always be empty, since q1 only gets messages during + %% publish, but for lazy queues messages go straight to delta. + true = E4, + + %% if the queue is empty, then delta is empty and q3 is empty. + true = LZ == (ED and E3), + + %% There should be no messages in q1, q2, and q4 + true = Delta#delta.count + L3 == Len, + + true = Len >= 0, + true = Bytes >= 0, + true = UnackedBytes >= 0, + true = PersistentCount >= 0, + true = PersistentBytes >= 0, + true = RamMsgCount >= 0, + true = RamMsgCount =< Len, + true = RamBytes >= 0, + true = RamBytes =< Bytes + UnackedBytes, + State. d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) @@ -1203,7 +1327,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, disk_read_count = 0, disk_write_count = 0, - io_batch_size = IoBatchSize }, + io_batch_size = IoBatchSize, + + mode = default }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1214,7 +1340,7 @@ blank_rates(Now) -> timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, - State = #vqstate { q3 = Q3, q4 = Q4 }) -> + State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = @@ -1223,10 +1349,24 @@ in_r(MsgStatus = #msg_status { msg = undefined }, stats(ready0, {MsgStatus, MsgStatus1}, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. +in_r(MsgStatus, + State = #vqstate { mode = default, q4 = Q4 }) -> + State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; +%% lazy queues +in_r(MsgStatus = #msg_status { seq_id = SeqId }, + State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> + case ?QUEUE:is_empty(Q3) of + true -> + {_MsgStatus1, State1} = + maybe_write_to_disk(true, true, MsgStatus, State), + State2 = stats(ready0, {MsgStatus, none}, State1), + Delta1 = expand_delta(SeqId, Delta), + State2 #vqstate{ delta = Delta1 }; + false -> + State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } + end. -queue_out(State = #vqstate { q4 = Q4 }) -> +queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of @@ -1235,6 +1375,12 @@ queue_out(State = #vqstate { q4 = Q4 }) -> end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end; +%% lazy queues +queue_out(State = #vqstate { mode = lazy }) -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end. read_msg(#msg_status{msg = undefined, @@ -1254,11 +1400,13 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, stats(Signs, Statuses, State) -> stats0(expand_signs(Signs), expand_statuses(Statuses), State). -expand_signs(ready0) -> {0, 0, true}; -expand_signs({A, B}) -> {A, B, false}. +expand_signs(ready0) -> {0, 0, true}; +expand_signs(lazy_pub) -> {1, 0, true}; +expand_signs({A, B}) -> {A, B, false}. expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; +expand_statuses({lazy, A}) -> {false , false, A}; expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% In this function at least, we are religious: the variable name @@ -1546,10 +1694,12 @@ process_delivers_and_acks_fun(_) -> %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- + publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, _Flow, PersistFun, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + mode = default, qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, in_counter = InCount, @@ -1567,6 +1717,26 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, stats({1, 0}, {none, MsgStatus1}, State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, + unconfirmed = UC1 }); +publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, _ChPid, _Flow, PersistFun, + State = #vqstate { mode = lazy, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC, + delta = Delta }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), + Delta1 = expand_delta(SeqId, Delta), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + stats(lazy_pub, {lazy, m(MsgStatus1)}, + State1#vqstate{ delta = Delta1, + next_seq_id = SeqId + 1, + in_counter = InCount + 1, unconfirmed = UC1 }). batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> @@ -1578,7 +1748,8 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, PersistFun, - State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + State = #vqstate { mode = default, + qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, @@ -1594,6 +1765,29 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, out_counter = OutCount + 1, in_counter = InCount + 1, unconfirmed = UC1 }), + {SeqId, State3}; +publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, + _ChPid, _Flow, PersistFun, + State = #vqstate { mode = lazy, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), + State2 = record_pending_ack(m(MsgStatus1), State1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, State3}. batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> @@ -2063,6 +2257,7 @@ ifold(Fun, Acc, Its, State) -> reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { + mode = default, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, @@ -2108,6 +2303,30 @@ reduce_memory_use(State = #vqstate { end, %% See rabbitmq-server-290 for the reasons behind this GC call. garbage_collect(), + State3; +%% When using lazy queues, there are no alphas, so we don't need to +%% call push_alphas_to_betas/2. +reduce_memory_use(State = #vqstate { + mode = lazy, + ram_pending_ack = RPA, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) -> + State1 = #vqstate { q3 = Q3 } = + case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of + 0 -> State; + S1 -> {_, State2} = limit_ram_acks(S1, State), + State2 + end, + + State3 = + case chunk_size(?QUEUE:len(Q3), + permitted_beta_count(State1)) of + 0 -> + State1; + S2 -> + push_betas_to_deltas(S2, State1) + end, + garbage_collect(), State3. limit_ram_acks(0, State) -> @@ -2131,6 +2350,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, permitted_beta_count(#vqstate { len = 0 }) -> infinity; +permitted_beta_count(#vqstate { mode = lazy, + target_ram_count = TargetRamCount}) -> + TargetRamCount; permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); permitted_beta_count(#vqstate { q1 = Q1, @@ -2148,7 +2370,8 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> Current - Permitted. -fetch_from_q3(State = #vqstate { q1 = Q1, +fetch_from_q3(State = #vqstate { mode = default, + q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, @@ -2178,6 +2401,19 @@ fetch_from_q3(State = #vqstate { q1 = Q1, State1 end, {loaded, {MsgStatus, State2}} + end; +%% lazy queues +fetch_from_q3(State = #vqstate { mode = lazy, + delta = #delta { count = DeltaCount }, + q3 = Q3 }) -> + case ?QUEUE:out(Q3) of + {empty, _Q3} when DeltaCount =:= 0 -> + {empty, State}; + {empty, _Q3} -> + fetch_from_q3(maybe_deltas_to_betas(State)); + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + {loaded, {MsgStatus, State1}} end. maybe_deltas_to_betas(State) -> @@ -2286,7 +2522,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, +push_betas_to_deltas(Quota, State = #vqstate { mode = default, + q2 = Q2, delta = Delta, q3 = Q3}) -> PushState = {Quota, Delta, State}, @@ -2301,8 +2538,22 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, {_, Delta1, State1} = PushState2, State1 #vqstate { q2 = Q2a, delta = Delta1, + q3 = Q3a }; +%% In the case of lazy queues we want to page as many messages as +%% possible from q3. +push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, + {Q3a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q3, PushState), + {_, Delta1, State1} = PushState1, + State1 #vqstate { delta = Delta1, q3 = Q3a }. + push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> |
