summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-14 18:00:03 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-14 18:00:03 +0100
commite3150cae57e5638391390dbfc608dc138c79b0cc (patch)
treedfcac3536afc4354021113f8a3ef2223d3bb731b /src
parentaeaf352012fe114b3e50cf90bd0aca0c18e85b74 (diff)
downloadrabbitmq-server-git-e3150cae57e5638391390dbfc608dc138c79b0cc.tar.gz
An implementation of the queue using database. This will only work with postgres - it really is impossible to make this kinda thing db agnostic. Performance is terrible, but it does work, and has an identical API to the rabbit_disk_queue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_db_queue.erl414
-rw-r--r--src/rabbit_db_queue_schema.sql22
2 files changed, 436 insertions, 0 deletions
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl
new file mode 100644
index 0000000000..bd6820d5d0
--- /dev/null
+++ b/src/rabbit_db_queue.erl
@@ -0,0 +1,414 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% So, assuming you're on some debian linux type system,
+%% apt-get install postgresql odbc-postgresql unixodbc unixodbc-bin
+%% sudo odbcinst -i -d -f /usr/share/psqlodbc/odbcinst.ini.template
+
+%% Now set up in postgresql a user and a database that user can
+%% access. For example, the database could be called rabbit_db_queue
+%% and the username could be rabbit and the password could be rabbit.
+
+%% sudo ODBCConfig
+%% set up a system wide dsn with the above settings in it.
+%% now drop into the erlang shell, and you should not get an error after:
+
+%% > odbc:start().
+%% < ok.
+%% > odbc:connect("DSN=rabbit_db_queue", []).
+%% < {ok,<0.325.0>}
+%% ( replace rabbit_db_queue with the name of your DSN that you configured )
+
+%% the connection string (eg "DSN=rabbit_db_queue") is what you pass
+%% to start_link. Don't just pass the DSN name.
+
+-module(rabbit_db_queue).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([publish/3, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
+
+-export([stop/0, stop_and_obliterate/0]).
+
+-include_lib("stdlib/include/qlc.hrl").
+-include("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+%% ---- SPECS ----
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+
+-spec(start_link/1 :: (non_neg_integer()) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
+-spec(deliver/1 :: (queue_name()) ->
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}}).
+-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
+-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
+-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+-spec(stop_and_obliterate/0 :: () -> 'ok').
+
+-endif.
+
+%% ---- PUBLIC API ----
+
+start_link(DSN) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [DSN], []).
+
+publish(Q, MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
+
+deliver(Q) ->
+ gen_server:call(?SERVER, {deliver, Q}, infinity).
+
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
+
+tx_publish(MsgId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
+
+tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
+ gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+
+tx_cancel(MsgIds) when is_list(MsgIds) ->
+ gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
+
+requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+
+stop() ->
+ gen_server:call(?SERVER, stop, infinity).
+
+stop_and_obliterate() ->
+ gen_server:call(?SERVER, stop_vaporise, infinity).
+
+%% ---- GEN-SERVER INTERNAL API ----
+-record(dbstate, { db_conn }).
+
+init([DSN]) ->
+ process_flag(trap_exit, true),
+ odbc:start(),
+ {ok, Conn} = odbc:connect(DSN, [{auto_commit, off}, {tuple_row, on},
+ {scrollable_cursors, off}, {trace_driver, off}]),
+ State = #dbstate { db_conn = Conn },
+ compact_already_delivered(State),
+ {ok, State}.
+
+handle_call({deliver, Q}, _From, State) ->
+ {ok, Result, State1} = internal_deliver(Q, State),
+ {reply, Result, State1};
+handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
+ {reply, ok, State1};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}; %% gen_server now calls terminate
+handle_call(stop_vaporise, _From, State = #dbstate { db_conn = Conn }) ->
+ odbc:sql_query(Conn, "delete from ledger"),
+ odbc:sql_query(Conn, "delete from sequence"),
+ odbc:sql_query(Conn, "delete from message"),
+ odbc:commit(Conn, commit),
+ {stop, normal, ok, State}.
+ %% gen_server now calls terminate, which then calls shutdown
+
+handle_cast({publish, Q, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
+ {noreply, State1};
+handle_cast({tx_publish, MsgId, MsgBody}, State) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ {noreply, State1};
+handle_cast({tx_cancel, MsgIds}, State) ->
+ {ok, State1} = internal_tx_cancel(MsgIds, State),
+ {noreply, State1};
+handle_cast({requeue, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
+ {noreply, State1}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ shutdown(State).
+
+shutdown(State = #dbstate { db_conn = Conn }) ->
+ odbc:disconnect(Conn),
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---- UTILITY FUNCTIONS ----
+
+binary_to_escaped_string(Bin) when is_binary(Bin) ->
+ "E'" ++ lists:flatten(lists:reverse(binary_to_escaped_string(Bin, []))) ++ "'".
+
+binary_to_escaped_string(<<>>, Acc) ->
+ Acc;
+binary_to_escaped_string(<<Byte:8, Rest/binary>>, Acc) ->
+ binary_to_escaped_string(Rest, [escape_byte(Byte) | Acc]).
+
+escape_byte(39) ->
+ "\\\\047";
+escape_byte(92) ->
+ "\\\\134";
+escape_byte(B) when B > 31 andalso B < 127 ->
+ B;
+escape_byte(B) ->
+ case io_lib:format("~.8B", [B]) of
+ O1 = [[_]] ->
+ "\\\\00" ++ O1;
+ O2 = [[_,_]] ->
+ "\\\\0" ++ O2;
+ O3 = [[_,_,_]] ->
+ "\\\\" ++ O3
+ end.
+
+escaped_string_to_binary(Str) when is_list(Str) ->
+ list_to_binary(lists:reverse(escaped_string_to_binary(Str, []))).
+
+escaped_string_to_binary([], Acc) ->
+ Acc;
+escaped_string_to_binary([$\\,$\\|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [$\\ | Acc]);
+escaped_string_to_binary([$\\,A,B,C|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [(list_to_integer([A])*64) +
+ (list_to_integer([B])*8) +
+ list_to_integer([C])
+ | Acc]);
+escaped_string_to_binary([C|Rest], Acc) ->
+ escaped_string_to_binary(Rest, [C|Acc]).
+
+hex_string_to_binary(Str) when is_list(Str) ->
+ list_to_binary(lists:reverse(hex_string_to_binary(Str, []))).
+
+hex_string_to_binary([], Acc) ->
+ Acc;
+hex_string_to_binary([A,B|Rest], Acc) ->
+ {ok, [N], []} = io_lib:fread("~16u", [A,B]),
+ hex_string_to_binary(Rest, [N | Acc]).
+
+%% ---- INTERNAL RAW FUNCTIONS ----
+
+internal_deliver(Q, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, empty, State};
+ {selected, _, [{ReadSeqId}]} ->
+ case odbc:sql_query(Conn, "select is_delivered, msg_id from ledger where queue = " ++ QStr ++
+ " and seq_id = " ++ integer_to_list(ReadSeqId)) of
+ {selected, _, []} ->
+ {ok, empty, State};
+ {selected, _, [{IsDeliveredStr, MsgIdStr}]} ->
+ IsDelivered = IsDeliveredStr /= "0",
+ if IsDelivered -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set is_delivered = true where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(ReadSeqId))
+ end,
+ MsgId = binary_to_term(hex_string_to_binary(MsgIdStr)),
+ %% yeah, this is really necessary. sigh
+ MsgIdStr2 = binary_to_escaped_string(term_to_binary(MsgId)),
+ {selected, _, [{MsgBodyStr}]} =
+ odbc:sql_query(Conn, "select msg from message where msg_id = " ++ MsgIdStr2),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++ integer_to_list(ReadSeqId + 1) ++
+ " where queue = " ++ QStr),
+ odbc:commit(Conn, commit),
+ MsgBody = hex_string_to_binary(MsgBodyStr),
+ BodySize = size(MsgBody),
+ {ok, {MsgId, MsgBody, BodySize, IsDelivered, {MsgId, ReadSeqId}}, State}
+ end
+ end.
+
+internal_ack(Q, MsgSeqIds, State) ->
+ remove_messages(Q, MsgSeqIds, true, State).
+
+%% Q is only needed if LedgerDelete /= false
+%% called from tx_cancel with LedgerDelete = false
+%% called from internal_tx_cancel with LedgerDelete = true
+%% called from ack with LedgerDelete = true
+remove_messages(Q, MsgSeqIds, LedgerDelete, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ lists:foreach(
+ fun ({MsgId, SeqId}) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ {selected, _, [{RefCount}]} =
+ odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++
+ MsgIdStr),
+ case RefCount of
+ 1 -> odbc:sql_query(Conn, "delete from message where msg_id = " ++
+ MsgIdStr);
+ _ -> odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount - 1) ++ " where msg_id = " ++
+ MsgIdStr)
+ end,
+ if LedgerDelete ->
+ odbc:sql_query(Conn, "delete from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId));
+ true -> ok
+ end
+ end, MsgSeqIds),
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+internal_tx_publish(MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ MsgStr = binary_to_escaped_string(MsgBody),
+ case odbc:sql_query(Conn, "select ref_count from message where msg_id = " ++ MsgIdStr) of
+ {selected, _, []} ->
+ odbc:sql_query(Conn, "insert into message (msg_id, msg, ref_count) values (" ++
+ MsgIdStr ++ ", " ++ MsgStr ++ ", 1)");
+ {selected, _, [{RefCount}]} ->
+ odbc:sql_query(Conn, "update message set ref_count = " ++
+ integer_to_list(RefCount + 1) ++ " where msg_id = " ++ MsgIdStr)
+ end,
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+internal_tx_commit(Q, PubMsgIds, AckSeqIds, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ {InsertOrUpdate, NextWrite} =
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} -> {insert, 0};
+ {selected, _, [{NextWrite2}]} -> {update, NextWrite2}
+ end,
+ NextWrite3 =
+ lists:foldl(fun (MsgId, WriteSeqInteger) ->
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ odbc:sql_query(Conn,
+ "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
+ QStr ++ ", " ++ integer_to_list(WriteSeqInteger) ++ ", false, " ++
+ MsgIdStr ++ ")"),
+ WriteSeqInteger + 1
+ end, NextWrite, PubMsgIds),
+ case InsertOrUpdate of
+ update -> odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(NextWrite3) ++
+ " where queue = " ++ QStr);
+ insert -> odbc:sql_query(Conn, "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, " ++ integer_to_list(NextWrite3) ++ ")")
+ end,
+ odbc:commit(Conn, commit),
+ remove_messages(Q, AckSeqIds, true, State),
+ {ok, State}.
+
+internal_publish(Q, MsgId, MsgBody, State = #dbstate { db_conn = Conn }) ->
+ {ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
+ MsgIdStr = binary_to_escaped_string(term_to_binary(MsgId)),
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ NextWrite =
+ case odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:sql_query(Conn,
+ "insert into sequence (queue, next_read, next_write) values (" ++
+ QStr ++ ", 0, 1)"),
+ 0;
+ {selected, _, [{NextWrite2}]} ->
+ odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(1 + NextWrite2) ++
+ " where queue = " ++ QStr),
+ NextWrite2
+ end,
+ odbc:sql_query(Conn, "insert into ledger (queue, seq_id, is_delivered, msg_id) values (" ++
+ QStr ++ ", " ++ integer_to_list(NextWrite) ++ ", false, " ++ MsgIdStr ++ ")"),
+ odbc:commit(Conn, commit),
+ {ok, State1}.
+
+internal_tx_cancel(MsgIds, State) ->
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
+ remove_messages(undefined, MsgSeqIds, false, State).
+
+internal_requeue(Q, MsgSeqIds, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ {selected, _, [{WriteSeqId}]} =
+ odbc:sql_query(Conn, "select next_write from sequence where queue = " ++ QStr),
+ WriteSeqId2 =
+ lists:foldl(
+ fun ({_MsgId, SeqId}, NextWriteSeqId) ->
+ odbc:sql_query(Conn, "update ledger set seq_id = " ++ integer_to_list(NextWriteSeqId) ++
+ " where seq_id = " ++ integer_to_list(SeqId) ++ " and queue = " ++ QStr),
+ NextWriteSeqId + 1
+ end, WriteSeqId, MsgSeqIds),
+ odbc:sql_query(Conn, "update sequence set next_write = " ++ integer_to_list(WriteSeqId2) ++
+ " where queue = " ++ QStr),
+ odbc:commit(Conn, commit),
+ {ok, State}.
+
+
+compact_already_delivered(#dbstate { db_conn = Conn }) ->
+ {selected, _, Seqs} = odbc:sql_query(Conn, "select queue, next_read from sequence"),
+ lists:foreach(
+ fun ({QHexStr, ReadSeqId}) ->
+ Q = binary_to_term(hex_string_to_binary(QHexStr)),
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select min(seq_id) from ledger where queue = "
+ ++ QStr) of
+ {selected, _, []} -> ok;
+ {selected, _, [{null}]} -> ok; %% AGH!
+ {selected, _, [{Min}]} ->
+ Gap = shuffle_up(Conn, QStr, Min - 1, ReadSeqId - 1, 0),
+ odbc:sql_query(Conn, "update sequence set next_read = " ++
+ integer_to_list(Min + Gap) ++
+ " where queue = " ++ QStr)
+ end
+ end, Seqs),
+ odbc:commit(Conn, commit).
+
+shuffle_up(_Conn, _QStr, SeqId, SeqId, Gap) ->
+ Gap;
+shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) ->
+ GapInc =
+ case odbc:sql_query(Conn, "select count(1) from ledger where queue = " ++
+ QStr ++ " and seq_id = " ++ integer_to_list(SeqId)) of
+ {selected, _, [{"0"}]} ->
+ 1;
+ {selected, _, [{"1"}]} ->
+ if Gap =:= 0 -> ok;
+ true -> odbc:sql_query(Conn, "update ledger set seq_id = " ++
+ integer_to_list(SeqId + Gap) ++ " where seq_id = " ++
+ integer_to_list(SeqId) ++ " and queue = " ++ QStr)
+ end,
+ 0
+ end,
+ shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc).
diff --git a/src/rabbit_db_queue_schema.sql b/src/rabbit_db_queue_schema.sql
new file mode 100644
index 0000000000..f5c49e8d4f
--- /dev/null
+++ b/src/rabbit_db_queue_schema.sql
@@ -0,0 +1,22 @@
+create table message (
+ msg_id bytea PRIMARY KEY,
+ msg bytea,
+ ref_count integer NOT NULL
+);
+create index message_msg_id_index on message (msg_id);
+
+create table sequence (
+ queue bytea PRIMARY KEY,
+ next_read integer NOT NULL,
+ next_write integer NOT NULL
+);
+create index sequence_queue_index on sequence (queue);
+
+create table ledger (
+ queue bytea NOT NULL,
+ seq_id integer NOT NULL,
+ is_delivered boolean NOT NULL,
+ msg_id bytea NOT NULL
+);
+create index ledger_queue_seq_id_index on ledger (queue, seq_id);
+