summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore3
-rw-r--r--Makefile9
-rw-r--r--ebin/rabbit_app.in12
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rw-r--r--packaging/debs/Debian/debian/changelog12
-rw-r--r--packaging/debs/Debian/debian/postinst3
-rw-r--r--packaging/windows-exe/Makefile16
-rw-r--r--packaging/windows-exe/lib/EnvVarUpdate.nsh327
-rw-r--r--packaging/windows-exe/rabbitmq.icobin0 -> 4286 bytes
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in241
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--scripts/rabbitmq-service.bat1
-rw-r--r--src/delegate.erl21
-rw-r--r--src/delegate_sup.erl26
-rw-r--r--src/rabbit.erl27
-rw-r--r--src/rabbit_amqqueue.erl31
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl61
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl)22
-rw-r--r--src/rabbit_direct.erl75
-rw-r--r--src/rabbit_misc.erl33
-rw-r--r--src/rabbit_msg_store.erl105
-rw-r--r--src/rabbit_networking.erl22
-rw-r--r--src/rabbit_node_monitor.erl53
-rw-r--r--src/rabbit_reader.erl27
-rw-r--r--src/rabbit_tests.erl70
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/test_sup.erl14
31 files changed, 1097 insertions, 198 deletions
diff --git a/.hgignore b/.hgignore
index 03b60914a3..912b4a5676 100644
--- a/.hgignore
+++ b/.hgignore
@@ -25,6 +25,9 @@ syntax: regexp
^packaging/macports/macports$
^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$
^packaging/windows/rabbitmq-server-windows-.*\.zip$
+^packaging/windows-exe/rabbitmq_server-.*$
+^packaging/windows-exe/rabbitmq-.*\.nsi$
+^packaging/windows-exe/rabbitmq-server-.*\.exe$
^docs/.*\.[15]\.gz$
^docs/.*\.man\.xml$
diff --git a/Makefile b/Makefile
index 00bfd62946..51b998f418 100644
--- a/Makefile
+++ b/Makefile
@@ -41,14 +41,12 @@ RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
# our type specs rely on features and bug fixes in dialyzer that are
-# only available in R14A upwards (R13B04 is erts 5.7.5)
-#
-# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.5" ]; then echo "true"; else echo "false"; fi)
+# only available in R14A upwards (R14A is erts 5.8)
+USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8]), halt().')
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
-ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(shell [ $(USE_SPECS) = "true" ] && echo "-Duse_specs")
+ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(if $(filter true,$(USE_SPECS)),-Duse_specs)
VERSION=0.0.0
TARBALL_NAME=rabbitmq-server-$(VERSION)
@@ -314,3 +312,4 @@ endif
ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-include $(DEPS_FILE)
endif
+
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index d3e9d84b4b..f837684c60 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -8,7 +8,8 @@
rabbit_node_monitor,
rabbit_router,
rabbit_sup,
- rabbit_tcp_client_sup]},
+ rabbit_tcp_client_sup,
+ rabbit_direct_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
@@ -33,4 +34,11 @@
{collect_statistics, none},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
- {delegate_count, 16}]}]}.
+ {delegate_count, 16},
+ {tcp_listen_options, [binary,
+ {packet, raw},
+ {reuseaddr, true},
+ {backlog, 128},
+ {nodelay, true},
+ {exit_on_close, false}]}
+ ]}]}.
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index b37f7ab1fa..473168644a 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -124,6 +124,12 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1
+- New Upstream Release
+
+* Tue Feb 1 2011 simon@rabbitmq.com 2.3.0-1
+- New Upstream Release
+
* Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index a60e691d15..12165dc0ac 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,15 @@
+rabbitmq-server (2.3.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Thu, 03 Feb 2011 12:43:56 +0000
+
+rabbitmq-server (2.3.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Tue, 01 Feb 2011 12:52:16 +0000
+
rabbitmq-server (2.2.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 05fb179cbf..134f16ee1b 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -26,7 +26,8 @@ fi
# create rabbitmq user
if ! getent passwd rabbitmq >/dev/null; then
adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \
- --no-create-home --gecos "RabbitMQ messaging server" rabbitmq
+ --no-create-home --gecos "RabbitMQ messaging server" \
+ --disabled-login rabbitmq
fi
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
diff --git a/packaging/windows-exe/Makefile b/packaging/windows-exe/Makefile
new file mode 100644
index 0000000000..59803f9ce9
--- /dev/null
+++ b/packaging/windows-exe/Makefile
@@ -0,0 +1,16 @@
+VERSION=0.0.0
+ZIP=../windows/rabbitmq-server-windows-$(VERSION)
+
+dist: rabbitmq-$(VERSION).nsi rabbitmq_server-$(VERSION)
+ makensis rabbitmq-$(VERSION).nsi
+
+rabbitmq-$(VERSION).nsi: rabbitmq_nsi.in
+ sed \
+ -e 's|%%VERSION%%|$(VERSION)|' \
+ $< > $@
+
+rabbitmq_server-$(VERSION):
+ unzip $(ZIP)
+
+clean:
+ rm -rf rabbitmq-*.nsi rabbitmq_server-* rabbitmq-server-*.exe
diff --git a/packaging/windows-exe/lib/EnvVarUpdate.nsh b/packaging/windows-exe/lib/EnvVarUpdate.nsh
new file mode 100644
index 0000000000..839d6a0206
--- /dev/null
+++ b/packaging/windows-exe/lib/EnvVarUpdate.nsh
@@ -0,0 +1,327 @@
+/**
+ * EnvVarUpdate.nsh
+ * : Environmental Variables: append, prepend, and remove entries
+ *
+ * WARNING: If you use StrFunc.nsh header then include it before this file
+ * with all required definitions. This is to avoid conflicts
+ *
+ * Usage:
+ * ${EnvVarUpdate} "ResultVar" "EnvVarName" "Action" "RegLoc" "PathString"
+ *
+ * Credits:
+ * Version 1.0
+ * * Cal Turney (turnec2)
+ * * Amir Szekely (KiCHiK) and e-circ for developing the forerunners of this
+ * function: AddToPath, un.RemoveFromPath, AddToEnvVar, un.RemoveFromEnvVar,
+ * WriteEnvStr, and un.DeleteEnvStr
+ * * Diego Pedroso (deguix) for StrTok
+ * * Kevin English (kenglish_hi) for StrContains
+ * * Hendri Adriaens (Smile2Me), Diego Pedroso (deguix), and Dan Fuhry
+ * (dandaman32) for StrReplace
+ *
+ * Version 1.1 (compatibility with StrFunc.nsh)
+ * * techtonik
+ *
+ * http://nsis.sourceforge.net/Environmental_Variables:_append%2C_prepend%2C_and_remove_entries
+ *
+ */
+
+
+!ifndef ENVVARUPDATE_FUNCTION
+!define ENVVARUPDATE_FUNCTION
+!verbose push
+!verbose 3
+!include "LogicLib.nsh"
+!include "WinMessages.NSH"
+!include "StrFunc.nsh"
+
+; ---- Fix for conflict if StrFunc.nsh is already includes in main file -----------------------
+!macro _IncludeStrFunction StrFuncName
+ !ifndef ${StrFuncName}_INCLUDED
+ ${${StrFuncName}}
+ !endif
+ !ifndef Un${StrFuncName}_INCLUDED
+ ${Un${StrFuncName}}
+ !endif
+ !define un.${StrFuncName} "${Un${StrFuncName}}"
+!macroend
+
+!insertmacro _IncludeStrFunction StrTok
+!insertmacro _IncludeStrFunction StrStr
+!insertmacro _IncludeStrFunction StrRep
+
+; ---------------------------------- Macro Definitions ----------------------------------------
+!macro _EnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define EnvVarUpdate '!insertmacro "_EnvVarUpdateConstructor"'
+
+!macro _unEnvVarUpdateConstructor ResultVar EnvVarName Action Regloc PathString
+ Push "${EnvVarName}"
+ Push "${Action}"
+ Push "${RegLoc}"
+ Push "${PathString}"
+ Call un.EnvVarUpdate
+ Pop "${ResultVar}"
+!macroend
+!define un.EnvVarUpdate '!insertmacro "_unEnvVarUpdateConstructor"'
+; ---------------------------------- Macro Definitions end-------------------------------------
+
+;----------------------------------- EnvVarUpdate start----------------------------------------
+!define hklm_all_users 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define hkcu_current_user 'HKCU "Environment"'
+
+!macro EnvVarUpdate UN
+
+Function ${UN}EnvVarUpdate
+
+ Push $0
+ Exch 4
+ Exch $1
+ Exch 3
+ Exch $2
+ Exch 2
+ Exch $3
+ Exch
+ Exch $4
+ Push $5
+ Push $6
+ Push $7
+ Push $8
+ Push $9
+ Push $R0
+
+ /* After this point:
+ -------------------------
+ $0 = ResultVar (returned)
+ $1 = EnvVarName (input)
+ $2 = Action (input)
+ $3 = RegLoc (input)
+ $4 = PathString (input)
+ $5 = Orig EnvVar (read from registry)
+ $6 = Len of $0 (temp)
+ $7 = tempstr1 (temp)
+ $8 = Entry counter (temp)
+ $9 = tempstr2 (temp)
+ $R0 = tempChar (temp) */
+
+ ; Step 1: Read contents of EnvVarName from RegLoc
+ ;
+ ; Check for empty EnvVarName
+ ${If} $1 == ""
+ SetErrors
+ DetailPrint "ERROR: EnvVarName is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for valid Action
+ ${If} $2 != "A"
+ ${AndIf} $2 != "P"
+ ${AndIf} $2 != "R"
+ SetErrors
+ DetailPrint "ERROR: Invalid Action - must be A, P, or R"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ${If} $3 == HKLM
+ ReadRegStr $5 ${hklm_all_users} $1 ; Get EnvVarName from all users into $5
+ ${ElseIf} $3 == HKCU
+ ReadRegStr $5 ${hkcu_current_user} $1 ; Read EnvVarName from current user into $5
+ ${Else}
+ SetErrors
+ DetailPrint 'ERROR: Action is [$3] but must be "HKLM" or HKCU"'
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Check for empty PathString
+ ${If} $4 == ""
+ SetErrors
+ DetailPrint "ERROR: PathString is blank"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Make sure we've got some work to do
+ ${If} $5 == ""
+ ${AndIf} $2 == "R"
+ SetErrors
+ DetailPrint "$1 is empty - Nothing to remove"
+ Goto EnvVarUpdate_Restore_Vars
+ ${EndIf}
+
+ ; Step 2: Scrub EnvVar
+ ;
+ StrCpy $0 $5 ; Copy the contents to $0
+ ; Remove spaces around semicolons (NOTE: spaces before the 1st entry or
+ ; after the last one are not removed here but instead in Step 3)
+ ${If} $0 != "" ; If EnvVar is not empty ...
+ ${Do}
+ ${${UN}StrStr} $7 $0 " ;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 " ;" ";" ; Remove '<space>;'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 "; "
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 "; " ";" ; Remove ';<space>'
+ ${Loop}
+ ${Do}
+ ${${UN}StrStr} $7 $0 ";;"
+ ${If} $7 == ""
+ ${ExitDo}
+ ${EndIf}
+ ${${UN}StrRep} $0 $0 ";;" ";"
+ ${Loop}
+
+ ; Remove a leading or trailing semicolon from EnvVar
+ StrCpy $7 $0 1 0
+ ${If} $7 == ";"
+ StrCpy $0 $0 "" 1 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ StrLen $6 $0
+ IntOp $6 $6 - 1
+ StrCpy $7 $0 1 $6
+ ${If} $7 == ";"
+ StrCpy $0 $0 $6 ; Change ';<EnvVar>' to '<EnvVar>'
+ ${EndIf}
+ ; DetailPrint "Scrubbed $1: [$0]" ; Uncomment to debug
+ ${EndIf}
+
+ /* Step 3. Remove all instances of the target path/string (even if "A" or "P")
+ $6 = bool flag (1 = found and removed PathString)
+ $7 = a string (e.g. path) delimited by semicolon(s)
+ $8 = entry counter starting at 0
+ $9 = copy of $0
+ $R0 = tempChar */
+
+ ${If} $5 != "" ; If EnvVar is not empty ...
+ StrCpy $9 $0
+ StrCpy $0 ""
+ StrCpy $8 0
+ StrCpy $6 0
+
+ ${Do}
+ ${${UN}StrTok} $7 $9 ";" $8 "0" ; $7 = next entry, $8 = entry counter
+
+ ${If} $7 == "" ; If we've run out of entries,
+ ${ExitDo} ; were done
+ ${EndIf} ;
+
+ ; Remove leading and trailing spaces from this entry (critical step for Action=Remove)
+ ${Do}
+ StrCpy $R0 $7 1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 "" 1 ; Remove leading space
+ ${Loop}
+ ${Do}
+ StrCpy $R0 $7 1 -1
+ ${If} $R0 != " "
+ ${ExitDo}
+ ${EndIf}
+ StrCpy $7 $7 -1 ; Remove trailing space
+ ${Loop}
+ ${If} $7 == $4 ; If string matches, remove it by not appending it
+ StrCpy $6 1 ; Set 'found' flag
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 == "" ; and the 1st string being added to $0,
+ StrCpy $0 $7 ; copy it to $0 without a prepended semicolon
+ ${ElseIf} $7 != $4 ; If string does NOT match
+ ${AndIf} $0 != "" ; and this is NOT the 1st string to be added to $0,
+ StrCpy $0 $0;$7 ; append path to $0 with a prepended semicolon
+ ${EndIf} ;
+
+ IntOp $8 $8 + 1 ; Bump counter
+ ${Loop} ; Check for duplicates until we run out of paths
+ ${EndIf}
+
+ ; Step 4: Perform the requested Action
+ ;
+ ${If} $2 != "R" ; If Append or Prepend
+ ${If} $6 == 1 ; And if we found the target
+ DetailPrint "Target is already present in $1. It will be removed and"
+ ${EndIf}
+ ${If} $0 == "" ; If EnvVar is (now) empty
+ StrCpy $0 $4 ; just copy PathString to EnvVar
+ ${If} $6 == 0 ; If found flag is either 0
+ ${OrIf} $6 == "" ; or blank (if EnvVarName is empty)
+ DetailPrint "$1 was empty and has been updated with the target"
+ ${EndIf}
+ ${ElseIf} $2 == "A" ; If Append (and EnvVar is not empty),
+ StrCpy $0 $0;$4 ; append PathString
+ ${If} $6 == 1
+ DetailPrint "appended to $1"
+ ${Else}
+ DetailPrint "Target was appended to $1"
+ ${EndIf}
+ ${Else} ; If Prepend (and EnvVar is not empty),
+ StrCpy $0 $4;$0 ; prepend PathString
+ ${If} $6 == 1
+ DetailPrint "prepended to $1"
+ ${Else}
+ DetailPrint "Target was prepended to $1"
+ ${EndIf}
+ ${EndIf}
+ ${Else} ; If Action = Remove
+ ${If} $6 == 1 ; and we found the target
+ DetailPrint "Target was found and removed from $1"
+ ${Else}
+ DetailPrint "Target was NOT found in $1 (nothing to remove)"
+ ${EndIf}
+ ${If} $0 == ""
+ DetailPrint "$1 is now empty"
+ ${EndIf}
+ ${EndIf}
+
+ ; Step 5: Update the registry at RegLoc with the updated EnvVar and announce the change
+ ;
+ ClearErrors
+ ${If} $3 == HKLM
+ WriteRegExpandStr ${hklm_all_users} $1 $0 ; Write it in all users section
+ ${ElseIf} $3 == HKCU
+ WriteRegExpandStr ${hkcu_current_user} $1 $0 ; Write it to current user section
+ ${EndIf}
+
+ IfErrors 0 +4
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Could not write updated $1 to $3"
+ DetailPrint "Could not write updated $1 to $3"
+ Goto EnvVarUpdate_Restore_Vars
+
+ ; "Export" our change
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ EnvVarUpdate_Restore_Vars:
+ ;
+ ; Restore the user's variables and return ResultVar
+ Pop $R0
+ Pop $9
+ Pop $8
+ Pop $7
+ Pop $6
+ Pop $5
+ Pop $4
+ Pop $3
+ Pop $2
+ Pop $1
+ Push $0 ; Push my $0 (ResultVar)
+ Exch
+ Pop $0 ; Restore his $0
+
+FunctionEnd
+
+!macroend ; EnvVarUpdate UN
+!insertmacro EnvVarUpdate ""
+!insertmacro EnvVarUpdate "un."
+;----------------------------------- EnvVarUpdate end----------------------------------------
+
+!verbose pop
+!endif
diff --git a/packaging/windows-exe/rabbitmq.ico b/packaging/windows-exe/rabbitmq.ico
new file mode 100644
index 0000000000..5e169a7996
--- /dev/null
+++ b/packaging/windows-exe/rabbitmq.ico
Binary files differ
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
new file mode 100644
index 0000000000..6d79ffd4fc
--- /dev/null
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -0,0 +1,241 @@
+; Use the "Modern" UI
+!include MUI2.nsh
+!include LogicLib.nsh
+!include WinMessages.nsh
+!include FileFunc.nsh
+!include WordFunc.nsh
+!include lib\EnvVarUpdate.nsh
+
+!define env_hklm 'HKLM "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"'
+!define uninstall "Software\Microsoft\Windows\CurrentVersion\Uninstall\RabbitMQ"
+
+;--------------------------------
+
+; The name of the installer
+Name "RabbitMQ Server %%VERSION%%"
+
+; The file to write
+OutFile "rabbitmq-server-%%VERSION%%.exe"
+
+; Icons
+!define MUI_ICON "rabbitmq.ico"
+
+; The default installation directory
+InstallDir "$PROGRAMFILES\RabbitMQ Server"
+
+; Registry key to check for directory (so if you install again, it will
+; overwrite the old one automatically)
+InstallDirRegKey HKLM "Software\VMware, Inc.\RabbitMQ Server" "Install_Dir"
+
+; Request application privileges for Windows Vista
+RequestExecutionLevel admin
+
+SetCompressor /solid lzma
+
+VIProductVersion "%%VERSION%%.0"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductVersion" "%%VERSION%%"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
+;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
+VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
+;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
+VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
+
+;--------------------------------
+
+; Pages
+
+
+; !insertmacro MUI_PAGE_LICENSE "..\..\LICENSE-MPL-RabbitMQ"
+ !insertmacro MUI_PAGE_COMPONENTS
+ !insertmacro MUI_PAGE_DIRECTORY
+ !insertmacro MUI_PAGE_INSTFILES
+ !insertmacro MUI_PAGE_FINISH
+
+ !insertmacro MUI_UNPAGE_CONFIRM
+ !insertmacro MUI_UNPAGE_INSTFILES
+ !define MUI_FINISHPAGE_TEXT "RabbitMQ Server %%VERSION%% has been uninstalled from your computer.$\n$\nPlease note that the log and database directories located at $APPDATA\RabbitMQ have not been removed. You can remove them manually if desired."
+ !insertmacro MUI_UNPAGE_FINISH
+
+;--------------------------------
+;Languages
+
+ !insertmacro MUI_LANGUAGE "English"
+
+;--------------------------------
+
+; The stuff to install
+Section "RabbitMQ Server (required)" Rabbit
+
+ SectionIn RO
+
+ ; Set output path to the installation directory.
+ SetOutPath $INSTDIR
+
+ ; Put files there
+ File /r "rabbitmq_server-%%VERSION%%"
+ File "rabbitmq.ico"
+
+ ; Add to PATH
+ ${EnvVarUpdate} $0 "PATH" "A" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
+
+ ; Write the installation path into the registry
+ WriteRegStr HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server" "Install_Dir" "$INSTDIR"
+
+ ; Write the uninstall keys for Windows
+ WriteRegStr HKLM ${uninstall} "DisplayName" "RabbitMQ Server"
+ WriteRegStr HKLM ${uninstall} "UninstallString" "$INSTDIR\uninstall.exe"
+ WriteRegStr HKLM ${uninstall} "DisplayIcon" "$INSTDIR\uninstall.exe,0"
+ WriteRegStr HKLM ${uninstall} "Publisher" "VMware, Inc."
+ WriteRegStr HKLM ${uninstall} "DisplayVersion" "%%VERSION%%"
+ WriteRegDWORD HKLM ${uninstall} "NoModify" 1
+ WriteRegDWORD HKLM ${uninstall} "NoRepair" 1
+
+ ${GetSize} "$INSTDIR" "/S=0K" $0 $1 $2
+ IntFmt $0 "0x%08X" $0
+ WriteRegDWORD HKLM "${uninstall}" "EstimatedSize" "$0"
+
+ WriteUninstaller "uninstall.exe"
+SectionEnd
+
+;--------------------------------
+
+Section "RabbitMQ Service" RabbitService
+ ExpandEnvStrings $0 %COMSPEC%
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" install'
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" start'
+ CopyFiles "$WINDIR\.erlang.cookie" "$PROFILE\.erlang.cookie"
+SectionEnd
+
+;--------------------------------
+
+Section "Start Menu" RabbitStartMenu
+ ; In case the service is not installed, or the service installation fails,
+ ; make sure these exist or Explorer will get confused.
+ CreateDirectory "$APPDATA\RabbitMQ\log"
+ CreateDirectory "$APPDATA\RabbitMQ\db"
+
+ CreateDirectory "$SMPROGRAMS\RabbitMQ Server"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Uninstall.lnk" "$INSTDIR\uninstall.exe" "" "$INSTDIR\uninstall.exe" 0
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Plugins Directory.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\plugins"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Log Directory.lnk" "$APPDATA\RabbitMQ\log"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Database Directory.lnk" "$APPDATA\RabbitMQ\db"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\(Re)Install Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "install" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Remove Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "remove" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Start Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "start" "$INSTDIR\rabbitmq.ico"
+ CreateShortCut "$SMPROGRAMS\RabbitMQ Server\Stop Service.lnk" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" "stop" "$INSTDIR\rabbitmq.ico"
+
+SectionEnd
+
+;--------------------------------
+
+; Section descriptions
+
+LangString DESC_Rabbit ${LANG_ENGLISH} "The RabbitMQ Server."
+LangString DESC_RabbitService ${LANG_ENGLISH} "Set up RabbitMQ as a Windows Service."
+LangString DESC_RabbitStartMenu ${LANG_ENGLISH} "Add some useful links to the start menu."
+
+!insertmacro MUI_FUNCTION_DESCRIPTION_BEGIN
+ !insertmacro MUI_DESCRIPTION_TEXT ${Rabbit} $(DESC_Rabbit)
+ !insertmacro MUI_DESCRIPTION_TEXT ${RabbitService} $(DESC_RabbitService)
+ !insertmacro MUI_DESCRIPTION_TEXT ${RabbitStartMenu} $(DESC_RabbitStartMenu)
+!insertmacro MUI_FUNCTION_DESCRIPTION_END
+
+;--------------------------------
+
+; Uninstaller
+
+Section "Uninstall"
+
+ ; Remove registry keys
+ DeleteRegKey HKLM ${uninstall}
+ DeleteRegKey HKLM "SOFTWARE\VMware, Inc.\RabbitMQ Server"
+
+ ; TODO these will fail if the service is not installed - do we care?
+ ExpandEnvStrings $0 %COMSPEC%
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" stop'
+ ExecWait '"$0" /C "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin\rabbitmq-service.bat" remove'
+
+ ; Remove from PATH
+ ${un.EnvVarUpdate} $0 "PATH" "R" "HKLM" "$INSTDIR\rabbitmq_server-%%VERSION%%\sbin"
+
+ ; Remove files and uninstaller
+ RMDir /r "$INSTDIR\rabbitmq_server-%%VERSION%%"
+ Delete "$INSTDIR\rabbitmq.ico"
+ Delete "$INSTDIR\uninstall.exe"
+
+ ; Remove start menu items
+ RMDir /r "$SMPROGRAMS\RabbitMQ Server"
+
+ DeleteRegValue ${env_hklm} ERLANG_HOME
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+SectionEnd
+
+;--------------------------------
+
+; Functions
+
+Function .onInit
+ Call findErlang
+
+ ReadRegStr $0 HKLM ${uninstall} "UninstallString"
+ ${If} $0 != ""
+ MessageBox MB_OKCANCEL|MB_ICONEXCLAMATION "RabbitMQ is already installed. $\n$\nClick 'OK' to remove the previous version or 'Cancel' to cancel this installation." IDCANCEL norun
+
+ ;Run the uninstaller
+ ClearErrors
+ ExecWait $INSTDIR\uninstall.exe
+
+ norun:
+ Abort
+ ${EndIf}
+FunctionEnd
+
+Function findErlang
+
+ StrCpy $0 0
+ StrCpy $2 "not-found"
+ ${Do}
+ EnumRegKey $1 HKLM Software\Ericsson\Erlang $0
+ ${If} $1 = ""
+ ${Break}
+ ${EndIf}
+ ${If} $1 <> "ErlSrv"
+ StrCpy $2 $1
+ ${EndIf}
+
+ IntOp $0 $0 + 1
+ ${Loop}
+
+ ${If} $2 = "not-found"
+ MessageBox MB_YESNO|MB_ICONEXCLAMATION "Erlang could not be detected.$\nYou must install Erlang before installing RabbitMQ. Would you like the installer to open a browser window to the Erlang download site?" IDNO abort
+ ExecShell "open" "http://www.erlang.org/download.html"
+ abort:
+ Abort
+ ${Else}
+ ${VersionCompare} $2 "5.6.3" $0
+ ${VersionCompare} $2 "5.8.1" $1
+
+ ${If} $0 = 2
+ MessageBox MB_OK|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is too old. Please install a more recent version."
+ Abort
+ ${ElseIf} $1 = 2
+ MessageBox MB_YESNO|MB_ICONEXCLAMATION "Your installed version of Erlang ($2) is comparatively old.$\nFor best results, please install a newer version.$\nDo you wish to continue?" IDYES no_abort
+ Abort
+ no_abort:
+ ${EndIf}
+
+ ReadRegStr $0 HKLM "Software\Ericsson\Erlang\$2" ""
+
+ ; See http://nsis.sourceforge.net/Setting_Environment_Variables
+ WriteRegExpandStr ${env_hklm} ERLANG_HOME $0
+ SendMessage ${HWND_BROADCAST} ${WM_WININICHANGE} 0 "STR:Environment" /TIMEOUT=5000
+
+ ; On Windows XP changing the permanent environment does not change *our*
+ ; environment, so do that as well.
+ System::Call 'Kernel32::SetEnvironmentVariableA(t, t) i("ERLANG_HOME", "$0").r0'
+ ${EndIf}
+
+FunctionEnd \ No newline at end of file
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 5c390a5193..2f80eb96c3 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -16,7 +16,6 @@
##
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
--kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 0cfa5ea848..2ca9f2b37a 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -142,7 +142,6 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
+W w ^
+A30 ^
+P 1048576 ^
--kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 43520b55cb..bc452fea59 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -207,7 +207,6 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-s rabbit ^
+W w ^
+A30 ^
--kernel inet_default_listen_options "[{nodelay,true}]" ^
-kernel inet_default_connect_options "[{nodelay,true}]" ^
!RABBITMQ_LISTEN_ARG! ^
-kernel error_logger {file,\""!RABBITMQ_LOG_BASE!/!RABBITMQ_NODENAME!.log"\"} ^
diff --git a/src/delegate.erl b/src/delegate.erl
index ff55a15b40..17046201ad 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/0 :: () -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -68,9 +66,9 @@ invoke(Pids, Fun) when is_list(Pids) ->
{Replies, BadNodes} =
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
- {invoke, Fun, Grouped},
- infinity)
+ RemoteNodes -> gen_server2:multi_call(
+ RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
@@ -92,7 +90,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
@@ -111,17 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate() ->
+delegate(RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(), delegate_count())),
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 5274722145..fc693c7d3d 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
-export([init/1]).
@@ -28,20 +28,32 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
+ R =:= nodedown ->
+ count(Nodes)
+ end.
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count(),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b041a6372c..1beed5c1a7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {rabbit, boot_delegate, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -145,13 +144,18 @@
{requires, routing_ready},
{enables, networking}]}).
+-rabbit_boot_step({direct_client,
+ [{mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
- {requires, log_relay},
- {enables, networking_listening}]}).
+ {requires, log_relay}]}).
--rabbit_boot_step({networking_listening,
- [{description, "network listeners available"}]}).
+-rabbit_boot_step({notify_cluster,
+ [{description, "notify cluster nodes"},
+ {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -179,6 +183,9 @@
{running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -225,11 +232,11 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, SupPid} = rabbit_sup:start_link(),
+ true = register(rabbit, self()),
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
-
{ok, SupPid};
Error ->
Error
@@ -448,6 +455,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ad9e3ce609..6e5aae27c7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -137,7 +137,9 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit()).
+ rabbit_types:connection_exit() |
+ fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(maybe_run_queue_via_backing_queue_async/2 ::
@@ -215,8 +217,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[_] -> %% Q exists on stopped node
rabbit_misc:const(not_found)
end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
+ [ExistingQ = #amqqueue{pid = QPid}] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> rabbit_misc:const(ExistingQ);
+ false -> TailFun = internal_delete(QueueName),
+ fun (Tx) -> TailFun(Tx), ExistingQ end
+ end
end
end).
@@ -350,7 +356,8 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) ->
+ delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -415,7 +422,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -432,17 +439,15 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
+ [] -> rabbit_misc:const({error, not_found});
+ [_] -> Deletions = internal_delete1(QueueName),
+ fun (Tx) -> ok = rabbit_binding:process_deletions(
+ Deletions, Tx)
+ end
end
- end,
- fun ({error, _} = Err, _Tx) ->
- Err;
- (Deletions, Tx) ->
- ok = rabbit_binding:process_deletions(Deletions, Tx)
end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3418c663f4..0d8a4c92e9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
@@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+ rabbit_event:notify(
+ queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) ->
end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -617,6 +618,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
+backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
{Guids, BQS1} = Fun(BQS),
run_message_queue(
@@ -786,20 +791,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -996,10 +1001,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
-handle_cast(sync_timeout, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS),
- sync_timer_ref = undefined});
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -1133,9 +1136,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{backing_queue = BQ}) ->
- noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index b270927be3..96a22dcaf1 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read(rabbit_exchange, XName) of
+ case mnesia:read({rabbit_exchange, XName}) of
[] ->
add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
[X] ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 91559ea6c9..a82e5eff3e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -277,7 +277,7 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, _Reason},
+handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
@@ -286,8 +286,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}, State),
erase_queue_stats(QPid),
- noreply(
- queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
+ State1 = case Reason of
+ normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
+ _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
+ end,
+ noreply(queue_blocked(QPid, State1)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -505,6 +508,8 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ %% these confirms will be emitted even when a queue dies, but that
+ %% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
@@ -735,16 +740,22 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ limiter_pid = LimiterPid}) ->
+ OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
%% that messages will be requeued in their original
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
- rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), self())
+ rabbit_misc:with_exit_handler(
+ OkFun, fun () ->
+ rabbit_amqqueue:requeue(
+ QPid, lists:reverse(MsgIds), self())
+ end)
end, ok, UAMQ),
+ ok = notify_limiter(LimiterPid, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1249,6 +1260,16 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+send_nacks([], State) ->
+ State;
+send_nacks(MXs, State) ->
+ MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
+ coalesce_and_send(MsgSeqNos,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
send_confirms(State = #ch{confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
@@ -1258,28 +1279,32 @@ send_confirms(State = #ch{confirmed = C}) ->
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- send_confirm(MsgSeqNo, WriterPid),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = MsgSeqNo}),
State;
-send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
- SCs = lists:usort(Cs),
+send_confirms(Cs, State) ->
+ coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+ #'basic.ack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
+coalesce_and_send(MsgSeqNos, MkMsgFun,
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case gb_trees:is_empty(UC) of
- true -> lists:last(SCs) + 1;
+ true -> lists:last(SMsgSeqNos) + 1;
false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
_ -> ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
- multiple = true})
+ WriterPid, MkMsgFun(lists:last(Ms), true))
end,
- [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ [ok = rabbit_writer:send_command(
+ WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-send_confirm(SeqNo, WriterPid) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = SeqNo}).
-
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d426d55df5..d21cfdb7fb 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,9 +31,11 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {rabbit_types:protocol(), rabbit_net:socket(),
+ {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -41,7 +43,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
+start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
- {ok, SupPid, {ChannelPid, AState}}.
+ {ok, SupPid, {ChannelPid, AState}};
+start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid,
+ User, VHost, Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl
index 1c2bbb6548..dbdc6cd429 100644
--- a/src/tcp_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(tcp_client_sup).
+-module(rabbit_client_sup).
-behaviour(supervisor2).
@@ -22,6 +22,21 @@
-export([init/1]).
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Callback) ->
supervisor2:start_link(?MODULE, Callback).
@@ -29,6 +44,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
- [{tcp_client, {M,F,A},
- temporary, infinity, supervisor, [M]}]}}.
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
new file mode 100644
index 0000000000..3b8c9fba39
--- /dev/null
+++ b/src/rabbit_direct.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_direct).
+
+-export([boot/0, connect/3, start_channel/5]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(connect/3 :: (binary(), binary(), binary()) ->
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
+-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+connect(Username, Password, VHost) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true ->
+ try rabbit_access_control:user_pass_login(Username, Password) of
+ #user{} = User ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> {ok, {User, rabbit_reader:server_properties()}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end
+ catch
+ exit:#amqp_error{name = access_refused} -> {error, auth_failure}
+ end;
+ false ->
+ {error, broker_not_found_on_node}
+ end.
+
+start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ {ok, ChannelPid}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3a4fb024fe..abc27c5f7d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -240,11 +242,20 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
{Same, Same} -> ok;
{Orig1, New1} -> protocol_error(
precondition_failed,
- "inequivalent arg '~s' for ~s: "
- "required ~w, received ~w",
- [Key, rabbit_misc:rs(Name), New1, Orig1])
+ "inequivalent arg '~s' for ~s: "
+ "received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
end.
+val(undefined) ->
+ "none";
+val({Type, Value}) ->
+ Fmt = case is_binary(Value) of
+ true -> "the value '~s' of type '~s'";
+ false -> "the value '~w' of type '~s'"
+ end,
+ lists:flatten(io_lib:format(Fmt, [Value, Type])).
+
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
@@ -341,8 +352,11 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
+ catch
+ exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
+ Handler();
+ exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
Handler()
end.
@@ -849,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e0706..7f3cf35faa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
@@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, Guid, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State1)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, not_found}, _Guid, State) ->
+ {ignore, undefined, State};
+write_action({true, #msg_location { file = File }}, _Guid, State) ->
+ {ignore, File, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, File, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
@@ -943,11 +965,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) ->
gb_sets:singleton(Guid), CTG)
end, CRef, State).
-client_confirm_if_on_disk(CRef, Guid, CurFile,
- State = #msstate { current_file = CurFile }) ->
- record_pending_confirm(CRef, Guid, State);
-client_confirm_if_on_disk(CRef, Guid, _File, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
- end, CRef, State).
-
client_confirm(CRef, Guids, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 031d4f181d..36f61628b8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,16 +32,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-
-define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000).
@@ -115,13 +105,13 @@ boot_ssl() ->
end.
start() ->
- {ok,_} = supervisor:start_child(
+ {ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
- {tcp_client_sup, start_link,
+ {rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [tcp_client_sup]}),
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%% inet_parse:address takes care of ip string, like "0.0.0.0"
@@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
@@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e4bc1cdc5a..817abaa2bd 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,14 +22,41 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([notify_cluster/0, rabbit_running_on/1]).
-define(SERVER, ?MODULE).
+-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(notify_cluster/0 :: () -> 'ok').
+
+-endif.
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+rabbit_running_on(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+
+notify_cluster() ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this rabbit
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
+ [Node], ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ %% register other active rabbits with this rabbit
+ [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ ok.
+
%%--------------------------------------------------------------------
init([]) ->
@@ -39,19 +66,20 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({rabbit_running_on, Node}, State) ->
+ rabbit_log:info("node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({nodeup, Node}, State) ->
- rabbit_log:info("node ~p up", [Node]),
- {noreply, State};
handle_info({nodedown, Node}, State) ->
- rabbit_log:info("node ~p down", [Node]),
- %% TODO: This may turn out to be a performance hog when there are
- %% lots of nodes. We really only need to execute this code on
- %% *one* node, rather than all of them.
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
+ rabbit_log:info("node ~p down~n", [Node]),
+ ok = handle_dead_rabbit(Node),
+ {noreply, State};
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+ rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+ ok = handle_dead_rabbit(Node),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
+%% TODO: This may turn out to be a performance hog when there are
+%% lots of nodes. We really only need to execute this code on
+%% *one* node, rather than all of them.
+handle_dead_rabbit(Node) ->
+ ok = rabbit_networking:on_node_down(Node),
+ ok = rabbit_amqqueue:on_node_down(Node).
+
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 52e0bb9dd5..1781469a13 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -72,7 +72,13 @@
%% pre-init:
%% receive protocol header -> send connection.start, *starting*
%% starting:
-%% receive connection.start_ok -> send connection.tune, *tuning*
+%% receive connection.start_ok -> *securing*
+%% securing:
+%% check authentication credentials
+%% if authentication success -> send connection.tune, *tuning*
+%% if more challenge needed -> send connection.secure,
+%% receive connection.secure_ok *securing*
+%% otherwise send close, *exit*
%% tuning:
%% receive connection.tune_ok -> start heartbeats, *opening*
%% opening:
@@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({handshake_timeout, State#v1.callback})
end;
timeout ->
- throw({timeout, State#v1.connection_state});
+ case State#v1.connection_state of
+ closed -> mainloop(Deb, State);
+ S -> throw({timeout, S})
+ end;
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -916,10 +925,14 @@ socket_info(Get, Select) ->
end.
ssl_info(F, Sock) ->
+ %% The first ok form is R14
+ %% The second is R13 - the extra term is exportability (by inspection,
+ %% the docs are wrong)
case rabbit_net:ssl_info(Sock) of
- nossl -> '';
- {error, _} -> '';
- {ok, Info} -> F(Info)
+ nossl -> '';
+ {error, _} -> '';
+ {ok, {P, {K, C, H}}} -> F({P, {K, C, H}});
+ {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
cert_info(F, Sock) ->
@@ -940,8 +953,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), User, VHost, Collector}),
+ ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
+ VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b0950832..58c369b5a3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -26,6 +26,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
Node = node(),
Self = self(),
Remote = spawn(SecondaryNode,
- fun () -> A = test_delegates_async(Node),
- B = test_delegates_sync(Node),
- Self ! {self(), {A, B}}
+ fun () -> Rs = [ test_delegates_async(Node),
+ test_delegates_sync(Node),
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
+ Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = {passed, passed}
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) ->
passed.
+test_queue_cleanup_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_queue_cleanup_receiver(Pid)
+ end.
+
+
+test_queue_cleanup(_SecondaryNode) ->
+ {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
+ receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
+ ok
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ rabbit:stop(),
+ rabbit:start(),
+ rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
+ queue = ?CLEANUP_QUEUE_NAME }),
+ receive
+ {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ ok
+ after 2000 ->
+ throw(failed_to_receive_channel_exit)
+ end,
+ passed.
+
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 18e2bdadb9..1a240856ce 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
{noreply, NState};
handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
- {value, Child} ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {value, Child1} ->
+ {ok, NState} = do_restart(RestartType, Reason, Child1, State),
{noreply, NState};
_ ->
{noreply, State}
@@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, _TRef} = timer:apply_after(
trunc(Delay*1000), ?MODULE, delayed_restart,
[self(), {{RestartType, Delay}, Reason, Child}]),
- {ok, NState}
+ {ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 76be63d0cf..b4df1fd042 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -59,19 +59,21 @@ start_child() ->
ping_child(SupPid) ->
Ref = make_ref(),
- get_child_pid(SupPid) ! {ping, Ref, self()},
+ with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end),
receive {pong, Ref} -> ok
after 1000 -> timeout
end.
exit_child(SupPid) ->
- true = exit(get_child_pid(SupPid), abnormal),
+ with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end),
ok.
-get_child_pid(SupPid) ->
- [{_Id, ChildPid, worker, [test_sup]}] =
- supervisor2:which_children(SupPid),
- ChildPid.
+with_child_pid(SupPid, Fun) ->
+ case supervisor2:which_children(SupPid) of
+ [{_Id, undefined, worker, [test_sup]}] -> ok;
+ [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid);
+ [] -> ok
+ end.
run_child() ->
receive {ping, Ref, Pid} -> Pid ! {pong, Ref},