summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl11
-rw-r--r--src/rabbit_auth_backend_internal.erl24
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl12
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_queue_index.erl123
-rw-r--r--src/rabbit_reader.erl3
6 files changed, 86 insertions, 92 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index b26bb9884b..4f0365718f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -1149,11 +1149,12 @@ notify_age(CStates, AverageAge) ->
end, CStates).
notify_age0(Clients, CStates, Required) ->
- Notifications =
- [CState || CState <- CStates, CState#cstate.callback =/= undefined],
- {L1, L2} = lists:split(random:uniform(length(Notifications)),
- Notifications),
- notify(Clients, Required, L2 ++ L1).
+ case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
+ [] -> ok;
+ Notifications -> S = random:uniform(length(Notifications)),
+ {L1, L2} = lists:split(S, Notifications),
+ notify(Clients, Required, L2 ++ L1)
+ end.
notify(_Clients, _Required, []) ->
ok;
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 3d00584551..f70813d1e7 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -85,10 +85,9 @@ check_user_login(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
check_user_login(Username, [{password, Password}]) ->
internal_check_user_login(
- Username,
- fun(#internal_user{password_hash = Hash}) ->
- check_password(Password, Hash)
- end);
+ Username, fun(#internal_user{password_hash = Hash}) ->
+ check_password(Password, Hash)
+ end);
check_user_login(Username, AuthProps) ->
exit({unknown_auth_props, Username, AuthProps}).
@@ -131,12 +130,11 @@ check_resource_access(#user{username = Username},
[] ->
false;
[#user_permission{permission = P}] ->
- PermRegexp =
- case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
case re:run(Name, PermRegexp, [{capture, none}]) of
match -> true;
nomatch -> false
@@ -221,11 +219,9 @@ salted_md5(Salt, Cleartext) ->
Salted = <<Salt/binary, Cleartext/binary>>,
erlang:md5(Salted).
-set_admin(Username) ->
- set_admin(Username, true).
+set_admin(Username) -> set_admin(Username, true).
-clear_admin(Username) ->
- set_admin(Username, false).
+clear_admin(Username) -> set_admin(Username, false).
set_admin(Username, IsAdmin) ->
R = update_user(Username, fun(User) ->
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index 77aa34ea0a..acbb6e48f6 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -53,10 +53,8 @@ handle_response(Response, State = #state{username = undefined}) ->
{challenge, <<"Please tell me your password">>,
State#state{username = Response}};
-handle_response(Response, #state{username = Username}) ->
- case Response of
- <<"My password is ", Password/binary>> ->
- rabbit_access_control:check_user_pass_login(Username, Password);
- _ ->
- {protocol_error, "Invalid response '~s'", [Response]}
- end.
+handle_response(<<"My password is ", Password/binary>>,
+ #state{username = Username}) ->
+ rabbit_access_control:check_user_pass_login(Username, Password);
+handle_response(Response, _State) ->
+ {protocol_error, "Invalid response '~s'", [Response]}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index e2f9bff9c5..2448acb660 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -65,15 +65,12 @@ extract_user_pass(Response) ->
end.
extract_elem(<<0:8, Rest/binary>>) ->
- Count = next_null_pos(Rest),
+ Count = next_null_pos(Rest, 0),
<<Elem:Count/binary, Rest1/binary>> = Rest,
{ok, Elem, Rest1};
extract_elem(_) ->
error.
-next_null_pos(Bin) ->
- next_null_pos(Bin, 0).
-
next_null_pos(<<>>, Count) -> Count;
next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count;
next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8227e4cd95..33c5391b20 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -126,13 +126,13 @@
%% (range: 0 - 16383)
-define(REL_SEQ_ONLY_PREFIX, 00).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
--define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
+-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
%% of md5sum msg id
--define(PUBLISH_PREFIX, 1).
--define(PUBLISH_PREFIX_BITS, 1).
+-define(PUB_PREFIX, 1).
+-define(PUB_PREFIX_BITS, 1).
-define(EXPIRY_BYTES, 8).
-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
@@ -140,13 +140,15 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + ?EXPIRY_BYTES + 2).
+
+%% 16 bytes for md5sum + 8 for expiry
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+%% + 2 for seq, bits and prefix
+-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
+ (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
%% ---- misc ----
@@ -537,27 +539,21 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties{expiry = Expiry}) ->
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
[MsgId, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-read_pub_record_body(Hdl) ->
- case file_handle_cache:read(Hdl, ?MSG_ID_BYTES + ?EXPIRY_BYTES) of
- {ok, Bin} ->
- %% work around for binary data fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
- <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties{expiry = Exp}};
- Error ->
- Error
- end.
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+ %% work around for binary data fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
+ Exp = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ {MsgId, #message_properties { expiry = Exp }}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -680,15 +676,16 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case read_pub_record_body(Hdl) of
- {MsgId, MsgProps} ->
- Publish = {MsgId, MsgProps,
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
+ case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
+ {ok, Bin} ->
+ {MsgId, MsgProps} = parse_pub_record_body(Bin),
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
load_journal_entries(
- add_to_journal(SeqId, Publish, State));
+ add_to_journal(
+ SeqId, {MsgId, MsgProps, IsPersistent}, State));
_ErrOrEoF -> %% err, we've lost at least a publish
State
end
@@ -798,7 +795,7 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok;
{MsgId, MsgProps, IsPersistent} ->
file_handle_cache:append(
- Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
create_pub_record_body(MsgId, MsgProps)])
@@ -845,36 +842,40 @@ load_segment(KeepAcked, #segment { path = Path }) ->
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
+ {ok, SegData} = file_handle_cache:read(
+ Hdl, ?SEGMENT_TOTAL_SIZE),
+ Res = load_segment_entries(KeepAcked, SegData, array_new(), 0),
ok = file_handle_cache:close(Hdl),
Res
end.
-load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
- case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
- {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- {MsgId, MsgProps} = read_pub_record_body(Hdl),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + 1);
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, Hdl, SegEntries1,
- UnackedCount + UnackedCountDelta);
- _ErrOrEoF ->
- {SegEntries, UnackedCount}
- end.
+load_segment_entries(KeepAcked,
+ <<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
+ SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
+ Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1);
+load_segment_entries(KeepAcked,
+ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, SegData/binary>>,
+ SegEntries, UnackedCount) ->
+ {UnackedCountDelta, SegEntries1} =
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
+ {Pub, del, no_ack} when KeepAcked ->
+ {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
+ {_Pub, del, no_ack} ->
+ {-1, array:reset(RelSeq, SegEntries)}
+ end,
+ load_segment_entries(KeepAcked, SegData, SegEntries1,
+ UnackedCount + UnackedCountDelta);
+load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) ->
+ {SegEntries, UnackedCount}.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1008,11 +1009,11 @@ add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
add_queue_ttl_journal(_) ->
stop.
-add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
Rest/binary>>) ->
- {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest};
+ {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>,
+ MsgId, expiry_to_binary(undefined)], Rest};
add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS, Rest>>) ->
{<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5afe556091..609bb43ffe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -201,7 +201,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ capabilities = []},
callback = uninitialized_callback,
recv_length = 0,
recv_ref = none,