summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-02-06 13:51:26 +0000
committerMatthias Radestock <matthias@lshift.net>2009-02-06 13:51:26 +0000
commite7abe601e1807c11bcec0261672eb23abbd7a9c9 (patch)
tree8edeb6f18fe3e7b1f0a9534aa07527243b63eded
parentce3787ff42a492406b681c52eddc89e588f1111d (diff)
downloadrabbitmq-server-bug20348.tar.gz
keep serial in separate process, to avoid persister bottleneckbug20348
...and move all the guid functions into a separate module
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_guid.erl126
-rw-r--r--src/rabbit_misc.erl40
-rw-r--r--src/rabbit_persister.erl2
5 files changed, 135 insertions, 45 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 41064c77..97bbdd99 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -182,6 +182,10 @@ start(normal, []) ->
fun () ->
ok = start_child(rabbit_persister)
end},
+ {"guid generator",
+ fun () ->
+ ok = start_child(rabbit_guid)
+ end},
{"builtin applications",
fun () ->
{ok, DefaultVHost} = application:get_env(default_vhost),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ca2782c7..4bf2f446 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -265,7 +265,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_misc:guid();
+ true -> rabbit_guid:guid();
false -> none
end,
{noreply, publish(Mandatory, Immediate,
@@ -338,7 +338,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_misc:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binstring_guid("amq.ctag");
Other -> Other
end,
@@ -550,7 +550,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
ActualNameBin =
case QueueNameBin of
- <<>> -> rabbit_misc:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -751,7 +751,7 @@ ack(ProxyPid, TxnKey, UAQ) ->
[QPid | L]
end, [], UAQ).
-make_tx_id() -> rabbit_misc:guid().
+make_tx_id() -> rabbit_guid:guid().
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
new file mode 100644
index 00000000..51c1665b
--- /dev/null
+++ b/src/rabbit_guid.erl
@@ -0,0 +1,126 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_guid).
+
+-include("rabbit.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([guid/0, string_guid/1, binstring_guid/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {serial}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(guid/0 :: () -> guid()).
+-spec(string_guid/1 :: (any()) -> string()).
+-spec(binstring_guid/1 :: (any()) -> binary()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ %% The persister can get heavily loaded, and we don't want that to
+ %% impact guid generation. We therefore keep the serial in a
+ %% separate process rather than calling rabbit_persister:serial/0
+ %% directly in the functions below.
+ gen_server:start_link({local, ?SERVER}, ?MODULE,
+ [rabbit_persister:serial()], []).
+
+%% generate a guid that is monotonically increasing per process.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% persistent message store hasn't been deleted.
+guid() ->
+ %% We don't use erlang:now() here because a) it may return
+ %% duplicates when the system clock has been rewound prior to a
+ %% restart, or ids were generated at a high rate (which causes
+ %% now() to move ahead of the system time), and b) it is really
+ %% slow since it takes a global lock and makes a system call.
+ %%
+ %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% includes the node name) uniquely identifies a process in space
+ %% and time. We combine that with a process-local counter to give
+ %% us a GUID that is monotonically increasing per process.
+ G = case get(guid) of
+ undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ {S, I} -> {S, I+1}
+ end,
+ put(guid, G),
+ G.
+
+%% generate a readable string representation of a guid. Note that any
+%% monotonicity of the guid is not preserved in the encoding.
+string_guid(Prefix) ->
+ %% we use the (undocumented) ssl_base64 module here because it is
+ %% present throughout OTP R11 and R12 whereas base64 only becomes
+ %% available in R11B-4.
+ %%
+ %% TODO: once debian stable and EPEL have moved from R11B-2 to
+ %% R11B-4 or later we should change this to use base64.
+ Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
+
+binstring_guid(Prefix) ->
+ list_to_binary(string_guid(Prefix)).
+
+%%----------------------------------------------------------------------------
+
+init([Serial]) ->
+ {ok, #state{serial = Serial}}.
+
+handle_call(serial, _From, State = #state{serial = Serial}) ->
+ {reply, Serial, State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 973e163b..053bde54 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,7 +46,6 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
--export([guid/0, string_guid/1, binstring_guid/1]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -98,9 +97,6 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
@@ -299,42 +295,6 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% generate a guid that is monotonically increasing per process.
-%%
-%% The id is only unique within a single cluster and as the persistent
-%% message store hasn't been deleted.
-guid() ->
- %% We don't use erlang:now() here because a) it may return
- %% duplicates when the system clock has been rewound prior to a
- %% restart, or ids were generated at a high rate (which causes
- %% now() to move ahead of the system time), and b) it is really
- %% slow since it takes a global lock and makes a system call.
- %%
- %% rabbit_persister:serial/0, in combination with self/0 (which
- %% includes the node name) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
- G = case get(guid) of
- undefined -> {{rabbit_persister:serial(), self()}, 0};
- {S, I} -> {S, I+1}
- end,
- put(guid, G),
- G.
-
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
-string_guid(Prefix) ->
- %% we use the (undocumented) ssl_base64 module here because it is
- %% present throughout OTP R11 and R12 whereas base64 only becomes
- %% available in R11B-4.
- %%
- %% TODO: once debian stable and EPEL have moved from R11B-2 to
- %% R11B-4 or later we should change this to use base64.
- Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
-
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
-
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index c34ad851..94033a4f 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -92,7 +92,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
- TxnKey = rabbit_misc:guid(),
+ TxnKey = rabbit_guid:guid(),
gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
extend_transaction(TxnKey, MessageList) ->