diff options
31 files changed, 1097 insertions, 198 deletions
@@ -25,6 +25,9 @@ syntax: regexp ^packaging/macports/macports$ ^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$ ^packaging/windows/rabbitmq-server-windows-.*\.zip$ +^packaging/windows-exe/rabbitmq_server-.*$ +^packaging/windows-exe/rabbitmq-.*\.nsi$ +^packaging/windows-exe/rabbitmq-server-.*\.exe$ ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ @@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt ifndef USE_SPECS # our type specs rely on features and bug fixes in dialyzer that are -# only available in R14A upwards (R13B04 is erts 5.7.5) -# -# NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi) +# only available in R14A upwards (R14A is erts 5.8) +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().') endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests -ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs") +ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs) VERSION=0.0.0 TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -314,3 +312,4 @@ endif ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) endif + diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index d3e9d84b4b..f837684c60 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -8,7 +8,8 @@ rabbit_node_monitor, rabbit_router, rabbit_sup, - rabbit_tcp_client_sup]}, + rabbit_tcp_client_sup, + rabbit_direct_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it @@ -33,4 +34,11 @@ {collect_statistics, none}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, - {delegate_count, 16}]}]}. + {delegate_count, 16}, + {tcp_listen_options, [binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 128}, + {nodelay, true}, + {exit_on_close, false}]} + ]}]}. diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index b37f7ab1fa..473168644a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -124,6 +124,12 @@ done rm -rf %{buildroot} %changelog +* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 +- New Upstream Release + +* Tue Feb 1 2011 simon@rabbitmq.com 2.3.0-1 +- New Upstream Release + * Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index a60e691d15..12165dc0ac 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.3.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Thu, 03 Feb 2011 12:43:56 +0000 + +rabbitmq-server (2.3.0-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Tue, 01 Feb 2011 12:52:16 +0000 + rabbitmq-server (2.2.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 05fb179cbf..134f16ee1b 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -26,7 +26,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ - --no-create-home --gecos "RabbitMQ messaging server" rabbitmq + --no-create-home --gecos "RabbitMQ messaging server" \ + --disabled-login rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile new file mode 100644 index 0000000000..59803f9ce9 --- /dev/null +++ b/packaging/windows-exe/Makefile @@ -0,0 +1,16 @@ +VERSION=0.0.0 +ZIP=../windows/rabbitmq-server-windows-$(VERSION) + +dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION) + makensis rabbitmq-$(VERSION).nsi + +rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in + sed \ + -e 's|%%VERSION%%|$(VERSION)|' \ + $< > $@ + +rabbitmq_server-$(VERSION): + unzip $(ZIP) + +clean: + rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh new file mode 100644 index 0000000000..839d6a0206 --- /dev/null +++ b/packaging/windows-exe/lib/EnvVarUpdate.nsh @@ -0,0 +1,327 @@ +/**
+ * EnvVarUpdate.nsh
+ * : Environmental Variables: append, prepend, and remove entries
+ *
+ * WARNING: If you use StrFunc.nsh header then include it before this file
+ * with all required definitions. This is to avoid conflicts
+ *
+ * Usage:
+ * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
+ *
+ * Credits:
+ * Version 1.0
+ * * Cal Turney (turnec2)
+ * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
+ * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
+ * WriteEnvStr, and un.DeleteEnvStr
+ * * Diego Pedroso (deguix) for StrTok
+ * * Kevin English (kenglish_hi) for StrContains
+ * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
+ * (dandaman32) for StrReplace
+ *
+ * Version 1.1 (compatibility with StrFunc.nsh)
+ * * techtonik
+ *
+ * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
+ *
+ */
+
+
+!ifndef ENVVARUPDATE_FUNCTION
+!define ENVVARUPDATE_FUNCTION
+!verbose push
+!verbose 3
+!include "LogicLib.nsh"
+!include "WinMessages.NSH"
+!include "StrFunc.nsh"
+
+; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
+!macro _IncludeStrFunction StrFuncName
+ !ifndef ${StrFuncName}_INCLUDED
+ ${${StrFuncName}}
+ !endif
+ !ifndef Un${StrFuncName}_INCLUDED
+ ${Un${StrFuncName}}
+ !endif
+ !define un.${StrFuncName} "${Un${StrFuncName}}"
+!macroend
+
+!insertmacro _IncludeStrFunction StrTok
+!insertmacro _IncludeStrFunction StrStr
+!insertmacro _IncludeStrFunction StrRep
+
+; ---------------------------------- Macro Definitions ----------------------------------------
+!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
+
+!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call un.EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
+; ---------------------------------- Macro Definitions end-------------------------------------
+
+;----------------------------------- EnvVarUpdate start----------------------------------------
+!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define hkcu_current_user 'HKCU "Environment"'
+
+!macro EnvVarUpdate UN
+
+Function ${UN}EnvVarUpdate
+
+ Push $0
+ Exch 4
+ Exch $1
+ Exch 3
+ Exch $2
+ Exch 2
+ Exch $3
+ Exch
+ Exch $4
+ Push $5
+ Push $6
+ Push $7
+ Push $8
+ Push $9
+ Push $R0
+
+ /* After this point:
+ -------------------------
+ $0 = ResultVar (returned)
+ $1 = EnvVarName (input)
+ $2 = Action (input)
+ $3 = RegLoc (input)
+ $4 = PathString (input)
+ $5 = Orig EnvVar (read from registry)
+ $6 = Len of $0 (temp)
+ $7 = tempstr1 (temp)
+ $8 = Entry counter (temp)
+ $9 = tempstr2 (temp)
+ $R0 = tempChar (temp) */
+
+ ; Step 1: Read contents of EnvVarName from RegLoc
+ ;
+ ; Check for empty EnvVarName
+ ${If} $1 == ""
+ SetErrors
+ DetailPrint "ERROR: EnvVarName is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for valid Action
+ ${If} $2 != "A"
+ ${AndIf} $2 != "P"
+ ${AndIf} $2 != "R"
+ SetErrors
+ DetailPrint "ERROR: Invalid Action - must be A, P, or R"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ${If} $3 == HKLM
+ ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
+ ${ElseIf} $3 == HKCU
+ ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
+ ${Else}
+ SetErrors
+ DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for empty PathString
+ ${If} $4 == ""
+ SetErrors
+ DetailPrint "ERROR: PathString is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Make sure we've got some work to do
+ ${If} $5 == ""
+ ${AndIf} $2 == "R"
+ SetErrors
+ DetailPrint "$1 is empty - Nothing to remove"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Step 2: Scrub EnvVar
+ ;
+ StrCpy $0 $5 ; Copy the contents to $0
+ ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
+ ; after the last one are not removed here but instead in Step 3)
+ ${If} $0 != "" ; If EnvVar is not empty ...
+ ${Do}
+ ${${UN}StrStr} $7 $0 " ;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 "; "
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 ";;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 ";;" ";"
+ ${Loop}
+
+ ; Remove a leading or trailing semicolon from EnvVar
+ StrCpy $7 $0 1 0
+ ${If} $7 == ";"
+ StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ StrLen $6 $0
+ IntOp $6 $6 - 1
+ StrCpy $7 $0 1 $6
+ ${If} $7 == ";"
+ StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
+ ${EndIf}
+
+ /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
+ $6 = bool flag (1 = found and removed PathString)
+ $7 = a string (e.g. path) delimited by semicolon(s)
+ $8 = entry counter starting at 0
+ $9 = copy of $0
+ $R0 = tempChar */
+
+ ${If} $5 != "" ; If EnvVar is not empty ...
+ StrCpy $9 $0
+ StrCpy $0 ""
+ StrCpy $8 0
+ StrCpy $6 0
+
+ ${Do}
+ ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
+
+ ${If} $7 == "" ; If we've run out of entries,
+ ${ExitDo} ; were done
+ ${EndIf} ;
+
+ ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
+ ${Do}
+ StrCpy $R0 $7 1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 "" 1 ; Remove leading space
+ ${Loop}
+ ${Do}
+ StrCpy $R0 $7 1 -1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 -1 ; Remove trailing space
+ ${Loop}
+ ${If} $7 == $4 ; If string matches, remove it by not appending it
+ StrCpy $6 1 ; Set 'found' flag
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 == "" ; and the 1st string being added to $0,
+ StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
+ StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
+ ${EndIf} ;
+
+ IntOp $8 $8 + 1 ; Bump counter
+ ${Loop} ; Check for duplicates until we run out of paths
+ ${EndIf}
+
+ ; Step 4: Perform the requested Action
+ ;
+ ${If} $2 != "R" ; If Append or Prepend
+ ${If} $6 == 1 ; And if we found the target
+ DetailPrint "Target is already present in $1. It will be removed and"
+ ${EndIf}
+ ${If} $0 == "" ; If EnvVar is (now) empty
+ StrCpy $0 $4 ; just copy PathString to EnvVar
+ ${If} $6 == 0 ; If found flag is either 0
+ ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
+ DetailPrint "$1 was empty and has been updated with the target"
+ ${EndIf}
+ ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
+ StrCpy $0 $0;$4 ; append PathString
+ ${If} $6 == 1
+ DetailPrint "appended to $1"
+ ${Else}
+ DetailPrint "Target was appended to $1"
+ ${EndIf}
+ ${Else} ; If Prepend (and EnvVar is not empty),
+ StrCpy $0 $4;$0 ; prepend PathString
+ ${If} $6 == 1
+ DetailPrint "prepended to $1"
+ ${Else}
+ DetailPrint "Target was prepended to $1"
+ ${EndIf}
+ ${EndIf}
+ ${Else} ; If Action = Remove
+ ${If} $6 == 1 ; and we found the target
+ DetailPrint "Target was found and removed from $1"
+ ${Else}
+ DetailPrint "Target was NOT found in $1 (nothing to remove)"
+ ${EndIf}
+ ${If} $0 == ""
+ DetailPrint "$1 is now empty"
+ ${EndIf}
+ ${EndIf}
+
+ ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
+ ;
+ ClearErrors
+ ${If} $3 == HKLM
+ WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
+ ${ElseIf} $3 == HKCU
+ WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
+ ${EndIf}
+
+ IfErrors 0 +4
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
+ DetailPrint "Could not write updated $1 to $3"
+ Goto EnvVarUpdate_Restore_Vars
+
+ ; "Export" our change
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ EnvVarUpdate_Restore_Vars:
+ ;
+ ; Restore the user's variables and return ResultVar
+ Pop $R0
+ Pop $9
+ Pop $8
+ Pop $7
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Push $0 ; Push my $0 (ResultVar)
+ Exch
+ Pop $0 ; Restore his $0
+
+FunctionEnd
+
+!macroend ; EnvVarUpdate UN
+!insertmacro EnvVarUpdate ""
+!insertmacro EnvVarUpdate "un."
+;----------------------------------- EnvVarUpdate end----------------------------------------
+
+!verbose pop
+!endif
diff --git a/packaging/windows-exe/rabbitmq.ico b/packaging/windows-exe/rabbitmq.ico Binary files differnew file mode 100644 index 0000000000..5e169a7996 --- /dev/null +++ b/packaging/windows-exe/rabbitmq.ico diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in new file mode 100644 index 0000000000..6d79ffd4fc --- /dev/null +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -0,0 +1,241 @@ +; Use the "Modern" UI +!include MUI2.nsh +!include LogicLib.nsh +!include WinMessages.nsh +!include FileFunc.nsh +!include WordFunc.nsh +!include lib\EnvVarUpdate.nsh + +!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"' +!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ" + +;-------------------------------- + +; The name of the installer +Name "RabbitMQ Server %%VERSION%%" + +; The file to write +OutFile "rabbitmq-server-%%VERSION%%.exe" + +; Icons +!define MUI_ICON "rabbitmq.ico" + +; The default installation directory +InstallDir "$PROGRAMFILES\RabbitMQ Server" + +; Registry key to check for directory (so if you install again, it will +; overwrite the old one automatically) +InstallDirRegKey HKLM "Software\VMware, Inc.\RabbitMQ Server" "Install_Dir" + +; Request application privileges for Windows Vista +RequestExecutionLevel admin + +SetCompressor /solid lzma + +VIProductVersion "%%VERSION%%.0" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%" +VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" +VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" +;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" +VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" + +;-------------------------------- + +; Pages + + +; !insertmacro MUI_PAGE_LICENSE "..\..\LICENSE-MPL-RabbitMQ" + !insertmacro MUI_PAGE_COMPONENTS + !insertmacro MUI_PAGE_DIRECTORY + !insertmacro MUI_PAGE_INSTFILES + !insertmacro MUI_PAGE_FINISH + + !insertmacro MUI_UNPAGE_CONFIRM + !insertmacro MUI_UNPAGE_INSTFILES + !define MUI_FINISHPAGE_TEXT "RabbitMQ Server %%VERSION%% has been uninstalled from your computer.$\n$\nPlease note that the log and database directories located at $APPDATA\RabbitMQ have not been removed. You can remove them manually if desired." + !insertmacro MUI_UNPAGE_FINISH + +;-------------------------------- +;Languages + + !insertmacro MUI_LANGUAGE "English" + +;-------------------------------- + +; The stuff to install +Section "RabbitMQ Server (required)" Rabbit + + SectionIn RO + + ; Set output path to the installation directory. + SetOutPath $INSTDIR + + ; Put files there + File /r "rabbitmq_server-%%VERSION%%" + File "rabbitmq.ico" + + ; Add to PATH + ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Write the installation path into the registry + WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR" + + ; Write the uninstall keys for Windows + WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server" + WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe" + WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0" + WriteRegStr HKLM ${uninstall} "Publisher" "VMware, Inc." + WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%" + WriteRegDWORD HKLM ${uninstall} "NoModify" 1 + WriteRegDWORD HKLM ${uninstall} "NoRepair" 1 + + ${GetSize} "$INSTDIR" "/S=0K" $0 $1 $2 + IntFmt $0 "0x%08X" $0 + WriteRegDWORD HKLM "${uninstall}" "EstimatedSize" "$0" + + WriteUninstaller "uninstall.exe" +SectionEnd + +;-------------------------------- + +Section "RabbitMQ Service" RabbitService + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start' + CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie" +SectionEnd + +;-------------------------------- + +Section "Start Menu" RabbitStartMenu + ; In case the service is not installed, or the service installation fails, + ; make sure these exist or Explorer will get confused. + CreateDirectory "$APPDATA\RabbitMQ\log" + CreateDirectory "$APPDATA\RabbitMQ\db" + + CreateDirectory "$SMPROGRAMS\RabbitMQ Server" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0 + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico" + CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico" + +SectionEnd + +;-------------------------------- + +; Section descriptions + +LangString DESC_Rabbit ${LANG_ENGLISH} "The RabbitMQ Server." +LangString DESC_RabbitService ${LANG_ENGLISH} "Set up RabbitMQ as a Windows Service." +LangString DESC_RabbitStartMenu ${LANG_ENGLISH} "Add some useful links to the start menu." + +!insertmacro MUI_FUNCTION_DESCRIPTION_BEGIN + !insertmacro MUI_DESCRIPTION_TEXT ${Rabbit} $(DESC_Rabbit) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitService} $(DESC_RabbitService) + !insertmacro MUI_DESCRIPTION_TEXT ${RabbitStartMenu} $(DESC_RabbitStartMenu) +!insertmacro MUI_FUNCTION_DESCRIPTION_END + +;-------------------------------- + +; Uninstaller + +Section "Uninstall" + + ; Remove registry keys + DeleteRegKey HKLM ${uninstall} + DeleteRegKey HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" + + ; TODO these will fail if the service is not installed - do we care? + ExpandEnvStrings $0 %COMSPEC% + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop' + ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove' + + ; Remove from PATH + ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin" + + ; Remove files and uninstaller + RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%" + Delete "$INSTDIR\rabbitmq.ico" + Delete "$INSTDIR\uninstall.exe" + + ; Remove start menu items + RMDir /r "$SMPROGRAMS\RabbitMQ Server" + + DeleteRegValue ${env_hklm} ERLANG_HOME + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + +SectionEnd + +;-------------------------------- + +; Functions + +Function .onInit + Call findErlang + + ReadRegStr $0 HKLM ${uninstall} "UninstallString" + ${If} $0 != "" + MessageBox MB_OKCANCEL|MB_ICONEXCLAMATION "RabbitMQ is already installed. $\n$\nClick 'OK' to remove the previous version or 'Cancel' to cancel this installation." IDCANCEL norun + + ;Run the uninstaller + ClearErrors + ExecWait $INSTDIR\uninstall.exe + + norun: + Abort + ${EndIf} +FunctionEnd + +Function findErlang + + StrCpy $0 0 + StrCpy $2 "not-found" + ${Do} + EnumRegKey $1 HKLM Software\Ericsson\Erlang $0 + ${If} $1 = "" + ${Break} + ${EndIf} + ${If} $1 <> "ErlSrv" + StrCpy $2 $1 + ${EndIf} + + IntOp $0 $0 + 1 + ${Loop} + + ${If} $2 = "not-found" + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Erlang could not be detected.$\nYou must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?" IDNO abort + ExecShell "open" "http://www.erlang.org/download.html" + abort: + Abort + ${Else} + ${VersionCompare} $2 "5.6.3" $0 + ${VersionCompare} $2 "5.8.1" $1 + + ${If} $0 = 2 + MessageBox MB_OK|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is too old. Please install a more recent version." + Abort + ${ElseIf} $1 = 2 + MessageBox MB_YESNO|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is comparatively old.$\nFor best results, please install a newer version.$\nDo you wish to continue?" IDYES no_abort + Abort + no_abort: + ${EndIf} + + ReadRegStr $0 HKLM "Software\Ericsson\Erlang\$2" "" + + ; See http://nsis.sourceforge.net/Setting_Environment_Variables + WriteRegExpandStr ${env_hklm} ERLANG_HOME $0 + SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000 + + ; On Windows XP changing the permanent environment does not change *our* + ; environment, so do that as well. + System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0' + ${EndIf} + +FunctionEnd
\ No newline at end of file diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 5c390a5193..2f80eb96c3 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -16,7 +16,6 @@ ## SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ --kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 0cfa5ea848..2ca9f2b37a 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( +W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 43520b55cb..bc452fea59 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl index ff55a15b40..17046201ad 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,8 +36,6 @@ ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], [{pid(), term()}]}). --spec(delegate_count/0 :: () -> non_neg_integer()). - -endif. %%---------------------------------------------------------------------------- @@ -68,9 +66,9 @@ invoke(Pids, Fun) when is_list(Pids) -> {Replies, BadNodes} = case orddict:fetch_keys(Grouped) of [] -> {[], []}; - RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), - {invoke, Fun, Grouped}, - infinity) + RemoteNodes -> gen_server2:multi_call( + RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, @@ -92,7 +90,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die @@ -111,17 +109,14 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], orddict:new()}, Pids). -delegate_count() -> - {ok, Count} = application:get_env(rabbit, delegate_count), - Count. - delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate() -> +delegate(RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(self(), delegate_count())), + erlang:phash2(self(), + delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; Name -> Name diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 5274722145..fc693c7d3d 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/1, count/1]). -export([init/1]). @@ -28,20 +28,32 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}). +-spec(count/1 :: ([node()]) -> integer()). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link(Count) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]). + +count([]) -> + 1; +count([Node | Nodes]) -> + try + length(supervisor:which_children({?SERVER, Node})) + catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> + count(Nodes); + exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown; + R =:= nodedown -> + count(Nodes) + end. %%---------------------------------------------------------------------------- -init(_Args) -> - DCount = delegate:delegate_count(), +init([Count]) -> {ok, {{one_for_one, 10, 10}, [{Num, {delegate, start_link, [Num]}, transient, 16#ffffffff, worker, [delegate]} || - Num <- lists:seq(0, DCount - 1)]}}. + Num <- lists:seq(0, Count - 1)]}}. diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a6372c..1beed5c1a7 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0]). +-export([maybe_insert_default_data/0, boot_delegate/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -101,8 +101,7 @@ -rabbit_boot_step({delegate_sup, [{description, "cluster delegate"}, - {mfa, {rabbit_sup, start_child, - [delegate_sup]}}, + {mfa, {rabbit, boot_delegate, []}}, {requires, kernel_ready}, {enables, core_initialized}]}). @@ -145,13 +144,18 @@ {requires, routing_ready}, {enables, networking}]}). +-rabbit_boot_step({direct_client, + [{mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {requires, log_relay}, - {enables, networking_listening}]}). + {requires, log_relay}]}). --rabbit_boot_step({networking_listening, - [{description, "network listeners available"}]}). +-rabbit_boot_step({notify_cluster, + [{description, "notify cluster nodes"}, + {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {requires, networking}]}). %%--------------------------------------------------------------------------- @@ -179,6 +183,9 @@ {running_nodes, [node()]}]). -spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). +-spec(maybe_insert_default_data/0 :: () -> 'ok'). +-spec(boot_delegate/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -225,11 +232,11 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, SupPid} = rabbit_sup:start_link(), + true = register(rabbit, self()), print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), - {ok, SupPid}; Error -> Error @@ -448,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, end end. +boot_delegate() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + rabbit_sup:start_child(delegate_sup, [Count]). + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad9e3ce609..6e5aae27c7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -137,7 +137,9 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit()). + rabbit_types:connection_exit() | + fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: @@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [_] -> %% Q exists on stopped node rabbit_misc:const(not_found) end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) + [ExistingQ = #amqqueue{pid = QPid}] -> + case rabbit_misc:is_process_alive(QPid) of + true -> rabbit_misc:const(ExistingQ); + false -> TailFun = internal_delete(QueueName), + fun (Tx) -> TailFun(Tx), ExistingQ end + end end end). @@ -350,7 +356,8 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> + delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -415,7 +422,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). @@ -432,17 +439,15 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) + [] -> rabbit_misc:const({error, not_found}); + [_] -> Deletions = internal_delete1(QueueName), + fun (Tx) -> ok = rabbit_binding:process_deletions( + Deletions, Tx) + end end - end, - fun ({error, _} = Err, _Tx) -> - Err; - (Deletions, Tx) -> - ok = rabbit_binding:process_deletions(Deletions, Tx) end). maybe_run_queue_via_backing_queue(QPid, Fun) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3418c663f4..0d8a4c92e9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, @@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> + rabbit_event:notify( + queue_deleted, [{pid, self()}]), BQS1 = BQ:delete_and_terminate(BQS), %% don't care if the internal delete %% doesn't return 'ok'. @@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) -> end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) || {Ch, CTag, _} <- consumers(State1)], - rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -617,6 +618,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. +backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> {Guids, BQS1} = Fun(BQS), run_message_queue( @@ -786,20 +791,20 @@ handle_call({init, Recover}, From, handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined} = State, - gen_server2:reply(From, not_found), - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) - end, - BQS = BQ:init(QName, IsDurable, Recover), - %% Rely on terminate to delete the queue. - {stop, normal, State#q{backing_queue_state = BQS}} + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + false -> #q{backing_queue = BQ, backing_queue_state = undefined, + q = #amqqueue{name = QName, durable = IsDurable}} = State, + gen_server2:reply(From, not_found), + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> @@ -996,10 +1001,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast(sync_timeout, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS), - sync_timer_ref = undefined}); +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -1133,9 +1136,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{backing_queue = BQ}) -> - noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index b270927be3..96a22dcaf1 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read(rabbit_exchange, XName) of + case mnesia:read({rabbit_exchange, XName}) of [] -> add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); [X] -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 91559ea6c9..a82e5eff3e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, _Reason}, +handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{unconfirmed = UC}) -> %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to @@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}, State), erase_queue_stats(QPid), - noreply( - queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). + State1 = case Reason of + normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); + _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) + end, + noreply(queue_blocked(QPid, State1)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -505,6 +508,8 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), + %% these confirms will be emitted even when a queue dies, but that + %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; @@ -735,16 +740,22 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ, + limiter_pid = LimiterPid}) -> + OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes %% that messages will be requeued in their original %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. - rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), self()) + rabbit_misc:with_exit_handler( + OkFun, fun () -> + rabbit_amqqueue:requeue( + QPid, lists:reverse(MsgIds), self()) + end) end, ok, UAMQ), + ok = notify_limiter(LimiterPid, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1249,6 +1260,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_nacks([], State) -> + State; +send_nacks(MXs, State) -> + MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], + coalesce_and_send(MsgSeqNos, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), @@ -1258,28 +1279,32 @@ send_confirms(State = #ch{confirmed = C}) -> send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - send_confirm(MsgSeqNo, WriterPid), + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = MsgSeqNo}), State; -send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> - SCs = lists:usort(Cs), +send_confirms(Cs, State) -> + coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> + #'basic.ack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + +coalesce_and_send(MsgSeqNos, MkMsgFun, + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case gb_trees:is_empty(UC) of - true -> lists:last(SCs) + 1; + true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, - {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; _ -> ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), - multiple = true}) + WriterPid, MkMsgFun(lists:last(Ms), true)) end, - [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + [ok = rabbit_writer:send_command( + WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -send_confirm(SeqNo, WriterPid) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = SeqNo}). - terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index d426d55df5..d21cfdb7fb 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,9 +31,11 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {rabbit_types:protocol(), rabbit_net:socket(), + {'tcp', rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), pid()}). + rabbit_types:user(), rabbit_types:vhost(), pid()} | + {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), + rabbit_types:vhost(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -41,7 +43,7 @@ %%---------------------------------------------------------------------------- -start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), - {ok, SupPid, {ChannelPid, AState}}. + {ok, SupPid, {ChannelPid, AState}}; +start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, + User, VHost, Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl index 1c2bbb6548..dbdc6cd429 100644 --- a/src/tcp_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --module(tcp_client_sup). +-module(rabbit_client_sup). -behaviour(supervisor2). @@ -22,6 +22,21 @@ -export([init/1]). +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (mfa()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, mfa()) -> + rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Callback) -> supervisor2:start_link(?MODULE, Callback). @@ -29,6 +44,5 @@ start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, - [{tcp_client, {M,F,A}, - temporary, infinity, supervisor, [M]}]}}. + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl new file mode 100644 index 0000000000..3b8c9fba39 --- /dev/null +++ b/src/rabbit_direct.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_direct). + +-export([boot/0, connect/3, start_channel/5]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(boot/0 :: () -> 'ok'). +-spec(connect/3 :: (binary(), binary(), binary()) -> + {'ok', {rabbit_types:user(), + rabbit_framing:amqp_table()}}). +-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(), + rabbit_types:user(), rabbit_types:vhost(), pid()) -> + {'ok', pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +boot() -> + {ok, _} = + supervisor2:start_child( + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), + ok. + +%%---------------------------------------------------------------------------- + +connect(Username, Password, VHost) -> + case lists:keymember(rabbit, 1, application:which_applications()) of + true -> + try rabbit_access_control:user_pass_login(Username, Password) of + #user{} = User -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> {ok, {User, rabbit_reader:server_properties()}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end + catch + exit:#amqp_error{name = access_refused} -> {error, auth_failure} + end; + false -> + {error, broker_not_found_on_node} + end. + +start_channel(Number, ClientChannelPid, User, VHost, Collector) -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, User, VHost, Collector}]), + {ok, ChannelPid}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3a4fb024fe..abc27c5f7d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -194,6 +195,7 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -endif. @@ -240,11 +242,20 @@ assert_args_equivalence1(Orig, New, Name, Key) -> {Same, Same} -> ok; {Orig1, New1} -> protocol_error( precondition_failed, - "inequivalent arg '~s' for ~s: " - "required ~w, received ~w", - [Key, rabbit_misc:rs(Name), New1, Orig1]) + "inequivalent arg '~s' for ~s: " + "received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) end. +val(undefined) -> + "none"; +val({Type, Value}) -> + Fmt = case is_binary(Value) of + true -> "the value '~s' of type '~s'"; + false -> "the value '~w' of type '~s'" + end, + lists:flatten(io_lib:format(Fmt, [Value, Type])). + dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; @@ -341,8 +352,11 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch exit:{R, _} when R =:= noproc; R =:= nodedown; - R =:= normal; R =:= shutdown -> + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> Handler() end. @@ -849,3 +863,12 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e0706..7f3cf35faa 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,7 +33,7 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). @@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, _Location} -> - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, Guid, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State1) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, not_found}, _Guid, State) -> + {ignore, undefined, State}; +write_action({true, #msg_location { file = File }}, _Guid, State) -> + {ignore, File, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, File, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). @@ -943,11 +965,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) -> gb_sets:singleton(Guid), CTG) end, CRef, State). -client_confirm_if_on_disk(CRef, Guid, CurFile, - State = #msstate { current_file = CurFile }) -> - record_pending_confirm(CRef, Guid, State); -client_confirm_if_on_disk(CRef, Guid, _File, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG - end, CRef, State). - client_confirm(CRef, Guids, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 031d4f181d..36f61628b8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -32,16 +32,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - {backlog, 128}, % use the maximum listen(2) backlog value - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false} - ]). - -define(SSL_TIMEOUT, 5). %% seconds -define(FIRST_TEST_BIND_PORT, 10000). @@ -115,13 +105,13 @@ boot_ssl() -> end. start() -> - {ok,_} = supervisor:start_child( + {ok,_} = supervisor2:start_child( rabbit_sup, {rabbit_tcp_client_sup, - {tcp_client_sup, start_link, + {rabbit_client_sup, start_link, [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [tcp_client_sup]}), + transient, infinity, supervisor, [rabbit_client_sup]}), ok. %% inet_parse:address takes care of ip string, like "0.0.0.0" @@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS], + [IPAddress, Port, [Family | tcp_opts()], {?MODULE, tcp_listener_started, [Protocol]}, {?MODULE, tcp_listener_stopped, [Protocol]}, OnConnect, Label]}, @@ -315,6 +305,10 @@ hostname() -> cmap(F) -> rabbit_misc:filter_exit_map(F, connections()). +tcp_opts() -> + {ok, Opts} = application:get_env(rabbit, tcp_listen_options), + Opts. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e4bc1cdc5a..817abaa2bd 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,14 +22,41 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). +-define(RABBIT_UP_RPC_TIMEOUT, 2000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(notify_cluster/0 :: () -> 'ok'). + +-endif. %%-------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). + +notify_cluster() -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this rabbit + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + %% register other active rabbits with this rabbit + [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + ok. + %%-------------------------------------------------------------------- init([]) -> @@ -39,19 +66,20 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast({rabbit_running_on, Node}, State) -> + rabbit_log:info("node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodeup, Node}, State) -> - rabbit_log:info("node ~p up", [Node]), - {noreply, State}; handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down", [Node]), - %% TODO: This may turn out to be a performance hog when there are - %% lots of nodes. We really only need to execute this code on - %% *one* node, rather than all of them. - ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node), + rabbit_log:info("node ~p down~n", [Node]), + ok = handle_dead_rabbit(Node), + {noreply, State}; +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> + rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), + ok = handle_dead_rabbit(Node), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- +%% TODO: This may turn out to be a performance hog when there are +%% lots of nodes. We really only need to execute this code on +%% *one* node, rather than all of them. +handle_dead_rabbit(Node) -> + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node). + diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 52e0bb9dd5..1781469a13 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -72,7 +72,13 @@ %% pre-init: %% receive protocol header -> send connection.start, *starting* %% starting: -%% receive connection.start_ok -> send connection.tune, *tuning* +%% receive connection.start_ok -> *securing* +%% securing: +%% check authentication credentials +%% if authentication success -> send connection.tune, *tuning* +%% if more challenge needed -> send connection.secure, +%% receive connection.secure_ok *securing* +%% otherwise send close, *exit* %% tuning: %% receive connection.tune_ok -> start heartbeats, *opening* %% opening: @@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({handshake_timeout, State#v1.callback}) end; timeout -> - throw({timeout, State#v1.connection_state}); + case State#v1.connection_state of + closed -> mainloop(Deb, State); + S -> throw({timeout, S}) + end; {'$gen_call', From, {shutdown, Explanation}} -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -916,10 +925,14 @@ socket_info(Get, Select) -> end. ssl_info(F, Sock) -> + %% The first ok form is R14 + %% The second is R13 - the extra term is exportability (by inspection, + %% the docs are wrong) case rabbit_net:ssl_info(Sock) of - nossl -> ''; - {error, _} -> ''; - {ok, Info} -> F(Info) + nossl -> ''; + {error, _} -> ''; + {ok, {P, {K, C, H}}} -> F({P, {K, C, H}}); + {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. cert_info(F, Sock) -> @@ -940,8 +953,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {Protocol, Sock, Channel, FrameMax, - self(), User, VHost, Collector}), + ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + VHost, Collector}), erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b0950832..58c369b5a3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -26,6 +26,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + passed = test_queue_cleanup(SecondaryNode), + passed = test_declare_on_dead_queue(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate Node = node(), Self = self(), Remote = spawn(SecondaryNode, - fun () -> A = test_delegates_async(Node), - B = test_delegates_sync(Node), - Self ! {self(), {A, B}} + fun () -> Rs = [ test_delegates_async(Node), + test_delegates_sync(Node), + test_queue_cleanup(Node), + test_declare_on_dead_queue(Node) ], + Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = {passed, passed} - after 2000 -> + Result = lists:duplicate(length(Result), passed) + after 30000 -> throw(timeout) end, @@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) -> passed. +test_queue_cleanup_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_queue_cleanup_receiver(Pid) + end. + + +test_queue_cleanup(_SecondaryNode) -> + {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), + receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> + ok + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + rabbit:stop(), + rabbit:start(), + rabbit_channel:do(Ch, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), + receive + {channel_exit, 1, {amqp_error, not_found, _, _}} -> + ok + after 2000 -> + throw(failed_to_receive_channel_exit) + end, + passed. + +test_declare_on_dead_queue(SecondaryNode) -> + QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), + Self = self(), + Pid = spawn(SecondaryNode, + fun () -> + {new, #amqqueue{name = QueueName, pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], + none), + exit(QPid, kill), + Self ! {self(), killed, QPid} + end), + receive + {Pid, killed, QPid} -> + {existing, #amqqueue{name = QueueName, + pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], none), + false = rabbit_misc:is_process_alive(QPid), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), + true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {ok, 0} = rabbit_amqqueue:delete(Q, false, false), + passed + after 2000 -> + throw(failed_to_create_and_kill_queue) + end. + %--------------------------------------------------------------------- control_action(Command, Args) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 18e2bdadb9..1a240856ce 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) {noreply, NState}; handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) -> case get_child(Child#child.name, State) of - {value, Child} -> - {ok, NState} = do_restart(RestartType, Reason, Child, State), + {value, Child1} -> + {ok, NState} = do_restart(RestartType, Reason, Child1, State), {noreply, NState}; _ -> {noreply, State} @@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> {ok, _TRef} = timer:apply_after( trunc(Delay*1000), ?MODULE, delayed_restart, [self(), {{RestartType, Delay}, Reason, Child}]), - {ok, NState} + {ok, state_del_child(Child, NState)} end; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), diff --git a/src/test_sup.erl b/src/test_sup.erl index 76be63d0cf..b4df1fd042 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -59,19 +59,21 @@ start_child() -> ping_child(SupPid) -> Ref = make_ref(), - get_child_pid(SupPid) ! {ping, Ref, self()}, + with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end), receive {pong, Ref} -> ok after 1000 -> timeout end. exit_child(SupPid) -> - true = exit(get_child_pid(SupPid), abnormal), + with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end), ok. -get_child_pid(SupPid) -> - [{_Id, ChildPid, worker, [test_sup]}] = - supervisor2:which_children(SupPid), - ChildPid. +with_child_pid(SupPid, Fun) -> + case supervisor2:which_children(SupPid) of + [{_Id, undefined, worker, [test_sup]}] -> ok; + [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid); + [] -> ok + end. run_child() -> receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, |
