diff options
| -rw-r--r-- | LICENSE-MPL-RabbitMQ | 2 | ||||
| -rwxr-xr-x | check_xref | 2 | ||||
| -rwxr-xr-x | codegen.py | 4 | ||||
| -rw-r--r-- | include/gm_specs.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_cli.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_msg_store.hrl | 2 | ||||
| -rw-r--r-- | packaging/windows-exe/rabbitmq_nsi.in | 4 | ||||
| -rwxr-xr-x | scripts/rabbitmq-service.bat | 8 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 135 |
10 files changed, 101 insertions, 62 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ index bdbdbec5e1..82c7cf5419 100644 --- a/LICENSE-MPL-RabbitMQ +++ b/LICENSE-MPL-RabbitMQ @@ -446,7 +446,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. - The Initial Developer of the Original Code is GoPivotal, Inc. + The Initial Developer of the Original Code is Pivotal Software, Inc. Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of diff --git a/check_xref b/check_xref index d9a483b85b..78f932dc89 100755 --- a/check_xref +++ b/check_xref @@ -14,7 +14,7 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2010-2015 Pivotal Software, Inc. All rights reserved. %% diff --git a/codegen.py b/codegen.py index f48eba8f78..10829f29dc 100755 --- a/codegen.py +++ b/codegen.py @@ -11,7 +11,7 @@ ## ## The Original Code is RabbitMQ. ## -## The Initial Developer of the Original Code is GoPivotal, Inc. +## The Initial Developer of the Original Code is Pivotal Software, Inc. ## Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. ## @@ -106,7 +106,7 @@ def printFileHeader(): %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %%""" diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index 5a98e7059b..bc20b4415d 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -10,7 +10,7 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9ad99a754a..6c3b131b34 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -10,7 +10,7 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% diff --git a/include/rabbit_cli.hrl b/include/rabbit_cli.hrl index 1bffc9a604..737bb4ea3d 100644 --- a/include/rabbit_cli.hrl +++ b/include/rabbit_cli.hrl @@ -10,7 +10,7 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index 803ed6b7ce..8bcf2ce629 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -10,7 +10,7 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index fd5fc2330c..153ff0ef93 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -35,7 +35,7 @@ VIProductVersion "%%VERSION%%.0" VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%" VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" -VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "GoPivotal, Inc" +VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "Pivotal Software, Inc" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved." VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" @@ -89,7 +89,7 @@ Section "RabbitMQ Server (required)" Rabbit WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server" WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe" WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0" - WriteRegStr HKLM ${uninstall} "Publisher" "GoPivotal, Inc." + WriteRegStr HKLM ${uninstall} "Publisher" "Pivotal Software, Inc." WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%" WriteRegDWORD HKLM ${uninstall} "NoModify" 1 WriteRegDWORD HKLM ${uninstall} "NoRepair" 1 diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index d6dd902ee9..0845bbf58e 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -146,7 +146,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( set RABBITMQ_START_RABBIT=
if "!RABBITMQ_NODE_ONLY!"=="" (
- set RABBITMQ_START_RABBIT=-s rabbit boot
+ set RABBITMQ_START_RABBIT=-s "!RABBITMQ_BOOT_MODULE!" boot
)
if "!RABBITMQ_IO_THREAD_POOL_SIZE!"=="" (
@@ -161,9 +161,10 @@ set ERLANG_SERVICE_ARGUMENTS= ^ +W w ^
+A "!RABBITMQ_IO_THREAD_POOL_SIZE!" ^
+P 1048576 ^
--kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+-kernel inet_default_connect_options "[{nodelay,true}]" ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
@@ -177,7 +178,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
-!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
@@ -191,7 +191,7 @@ set ERLANG_SERVICE_ARGUMENTS=!ERLANG_SERVICE_ARGUMENTS:"=\"! -stopaction "rabbit:stop_and_halt()." ^
!RABBITMQ_NAME_TYPE! !RABBITMQ_NODENAME! ^
!CONSOLE_FLAG! ^
--comment "A robust and scalable messaging broker" ^
+-comment "Multi-protocol open source messaging broker" ^
-args "!ERLANG_SERVICE_ARGUMENTS!" > NUL
goto END
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 544b536aa2..6f36d4f0dc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -179,7 +179,8 @@ max_journal_entries, on_sync, on_sync_msg, unconfirmed, unconfirmed_msg}). --record(segment, {num, path, journal_entries, unacked}). +-record(segment, {num, path, journal_entries, + entries_to_segment, unacked}). -include("rabbit.hrl"). @@ -194,10 +195,11 @@ -type(hdl() :: ('undefined' | any())). -type(segment() :: ('undefined' | - #segment { num :: non_neg_integer(), - path :: file:filename(), - journal_entries :: array:array(), - unacked :: non_neg_integer() + #segment { num :: non_neg_integer(), + path :: file:filename(), + journal_entries :: array:array(), + entries_to_segment :: array:array(), + unacked :: non_neg_integer() })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dict(), [segment()]}). @@ -650,30 +652,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCount }) -> + + {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), + + {JEntries1, EToSeg1} = + case Fun of + set -> + {array:set(RelSeq, Entry, JEntries), + array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), + EToSeg)}; + reset -> + {array:reset(RelSeq, JEntries), + array:reset(RelSeq, EToSeg)} + end, + Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries), + journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = UnackedCount + case Action of ?PUB -> +1; del -> 0; ack -> -1 - end}; + end}. -add_to_journal(RelSeq, Action, JEntries) -> +action_to_entry(RelSeq, Action, JEntries) -> case array:get(RelSeq, JEntries) of undefined -> - array:set(RelSeq, - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end, JEntries); + {set, + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end}; ({Pub, no_del, no_ack}) when Action == del -> - array:set(RelSeq, {Pub, del, no_ack}, JEntries); + {set, {Pub, del, no_ack}}; ({no_pub, del, no_ack}) when Action == ack -> - array:set(RelSeq, {no_pub, del, ack}, JEntries); + {set, {no_pub, del, ack}}; ({?PUB, del, no_ack}) when Action == ack -> - array:reset(RelSeq, JEntries) + {reset, none} end. maybe_flush_journal(State) -> @@ -704,18 +722,23 @@ flush_journal(State = #qistate { segments = Segments }) -> notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, + entries_to_segment = EToSeg, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> Seg = array:sparse_foldr( - fun entry_to_segment/3, [], JEntries), - file_handle_cache_stats:update(queue_index_write), - - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, - [{write_buffer, infinity}]), - file_handle_cache:append(Hdl, Seg), - ok = file_handle_cache:close(Hdl), - Segment #segment { journal_entries = array_new() } + _ -> + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + [{write_buffer, infinity}]), + %% the file_handle_cache also does a list reverse, so this + %% might not be required here, but before we were doing a + %% sparse_foldr, a lists:reverse/1 seems to be the correct + %% thing to do for now. + file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), + ok = file_handle_cache:close(Hdl), + Segment #segment { journal_entries = array_new(), + entries_to_segment = array_new([]) } end. get_journal_handle(State = #qistate { journal_handle = undefined, @@ -748,14 +771,16 @@ recover_journal(State) -> Segments1 = segment_map( fun (Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCountInJournal }) -> %% We want to keep ack'd entries in so that we can %% remove them if duplicates are in the journal. The %% counts here are purely from the segment itself. {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), - {JEntries1, UnackedCountDuplicates} = - journal_minus_segment(JEntries, SegEntries), + {JEntries1, EToSeg1, UnackedCountDuplicates} = + journal_minus_segment(JEntries, EToSeg, SegEntries), Segment #segment { journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = (UnackedCountInJournal + UnackedCountInSeg - UnackedCountDuplicates) } @@ -842,10 +867,11 @@ segment_find_or_new(Seg, Dir, Segments) -> {ok, Segment} -> Segment; error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), - #segment { num = Seg, - path = Path, - journal_entries = array_new(), - unacked = 0 } + #segment { num = Seg, + path = Path, + journal_entries = array_new(), + entries_to_segment = array_new([]), + unacked = 0 } end. segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> @@ -885,20 +911,20 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> - Buf; -entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> +entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> + Initial; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> %% NB: we are assembling the segment in reverse order here, so %% del/ack comes first. Buf1 = case {Del, Ack} of {no_del, no_ack} -> - Buf; + Initial; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, case {Del, Ack} of - {del, ack} -> [[Binary, Binary] | Buf]; - _ -> [Binary | Buf] + {del, ack} -> [[Binary, Binary] | Initial]; + _ -> [Binary | Initial] end end, case Pub of @@ -987,7 +1013,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> end. array_new() -> - array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). + array_new(undefined). + +array_new(Default) -> + array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). bool_to_int(true ) -> 1; bool_to_int(false) -> 0. @@ -1033,19 +1062,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. -journal_minus_segment(JEntries, SegEntries) -> +%% +%% We need to update the entries_to_segment since they are just a +%% cache of what's on the journal. +journal_minus_segment(JEntries, EToSeg, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> + fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> SegEntry = array:get(RelSeq, SegEntries), {Obj, UnackedRemovedDelta} = journal_minus_segment1(JObj, SegEntry), - {case Obj of - keep -> JEntriesOut; - undefined -> array:reset(RelSeq, JEntriesOut); - _ -> array:set(RelSeq, Obj, JEntriesOut) - end, - UnackedRemoved + UnackedRemovedDelta} - end, {JEntries, 0}, JEntries). + {JEntriesOut1, EToSegOut1} = + case Obj of + keep -> + {JEntriesOut, EToSegOut}; + undefined -> + {array:reset(RelSeq, JEntriesOut), + array:reset(RelSeq, EToSegOut)}; + _ -> + {array:set(RelSeq, Obj, JEntriesOut), + array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), + EToSegOut)} + end, + {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} + end, {JEntries, EToSeg, 0}, JEntries). %% Here, the result is a tuple with the first element containing the %% item we are adding to or modifying in the (initially fresh) journal |
