summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-22 14:33:21 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-22 14:33:21 +0100
commite47333131fc5c3d196fdad432cd1573ff486feb5 (patch)
treefaf2d17365ff3a5a01f7680afc44fa48e826190a
parent880d998792b91272b38e3cba7a7a188d4af4704d (diff)
parentfb5492875cd3581d02c4a7ffba597f8e3471802d (diff)
downloadrabbitmq-server-e47333131fc5c3d196fdad432cd1573ff486feb5.tar.gz
Merging heads of default
-rw-r--r--src/file_handle_cache.erl367
-rw-r--r--src/rabbit_exchange_type_topic.erl2
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_tests.erl1
4 files changed, 224 insertions, 151 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index dd693cb4..62a8af5d 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -130,7 +130,7 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -149,6 +149,7 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+-define(CLIENT_ETS_TABLE, ?MODULE).
%%----------------------------------------------------------------------------
@@ -182,11 +183,19 @@
obtain_limit,
obtain_count,
obtain_pending,
- callbacks,
- counts,
+ clients,
timer_ref
}).
+-record(cstate,
+ { pid,
+ callback,
+ opened,
+ obtained,
+ blocked,
+ pending_closes
+ }).
+
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -224,6 +233,8 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(obtain/0 :: () -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
+-spec(get_limit/0 :: () -> non_neg_integer()).
-endif.
@@ -451,6 +462,12 @@ obtain() ->
transfer(Pid) ->
gen_server:cast(?SERVER, {transfer, self(), Pid}).
+set_limit(Limit) ->
+ gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+
+get_limit() ->
+ gen_server:call(?SERVER, get_limit, infinity).
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -536,8 +553,7 @@ age_tree_insert(Now, Ref) ->
Tree = get_age_tree(),
Tree1 = gb_trees:insert(Now, Ref, Tree),
{Oldest, _Ref} = gb_trees:smallest(Tree1),
- case gen_server:call(?SERVER, {open, self(), Oldest,
- not gb_trees:is_empty(Tree)}, infinity) of
+ case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of
ok ->
put_age_tree(Tree1);
close ->
@@ -728,12 +744,10 @@ init([]) ->
_ ->
ulimit()
end,
- ObtainLimit = case Limit of
- infinity -> infinity;
- _ -> ?OBTAIN_LIMIT(Limit)
- end,
+ ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
+ Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
{ok, #fhc_state { elders = dict:new(),
limit = Limit,
open_count = 0,
@@ -741,113 +755,122 @@ init([]) ->
obtain_limit = ObtainLimit,
obtain_count = 0,
obtain_pending = [],
- callbacks = dict:new(),
- counts = dict:new(),
+ clients = Clients,
timer_ref = undefined }}.
-handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
+handle_call({open, Pid, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
- elders = Elders }) ->
+ elders = Elders,
+ clients = Clients })
+ when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
Item = {open, Pid, From},
- case maybe_reduce(ensure_mref(Pid, State #fhc_state {
- open_count = Count + 1,
- elders = Elders1 })) of
- {true, State1} ->
- State2 = State1 #fhc_state { open_count = Count },
- case CanClose of
- true -> {reply, close, State2};
- false -> {noreply, State2 #fhc_state {
- open_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders1) }}
- end;
- {false, State1} ->
- {noreply, run_pending_item(Item, State1)}
+ ok = track_client(Pid, Clients),
+ State1 = State #fhc_state { elders = Elders1 },
+ case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of
+ true -> case ets:lookup(Clients, Pid) of
+ [#cstate { opened = 0 }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.blocked, true}),
+ {noreply,
+ reduce(State1 #fhc_state {
+ open_pending = [Item | Pending] })};
+ [#cstate { opened = N }] ->
+ true = ets:update_element(
+ Clients, Pid, {#cstate.pending_closes, N}),
+ {reply, close, State1}
+ end;
+ false -> {noreply, run_pending_item(Item, State1)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
obtain_count = Count,
obtain_pending = Pending,
- elders = Elders })
+ clients = Clients })
when Limit =/= infinity andalso Count >= Limit ->
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
Item = {obtain, Pid, From},
- {noreply, ensure_mref(Pid, State #fhc_state {
- obtain_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders) })};
+ {noreply, State #fhc_state { obtain_pending = [Item | Pending] }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
- elders = Elders }) ->
+ clients = Clients }) ->
Item = {obtain, Pid, From},
- case maybe_reduce(ensure_mref(Pid, State #fhc_state {
- obtain_count = Count + 1 })) of
- {true, State1} ->
- {noreply, State1 #fhc_state {
- obtain_count = Count,
- obtain_pending = [Item | Pending],
- elders = dict:erase(Pid, Elders) }};
- {false, State1} ->
- {noreply, run_pending_item(Item, State1)}
- end.
+ ok = track_client(Pid, Clients),
+ case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ true ->
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
+ {noreply,
+ reduce(State #fhc_state {obtain_pending = [Item | Pending] })};
+ false ->
+ {noreply, run_pending_item(Item, State)}
+ end;
+handle_call({set_limit, Limit}, _From, State) ->
+ {reply, ok, maybe_reduce(
+ process_pending(State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) }))};
+handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
+ {reply, Limit, State}.
handle_cast({register_callback, Pid, MFA},
- State = #fhc_state { callbacks = Callbacks }) ->
- {noreply, ensure_mref(
- Pid, State #fhc_state {
- callbacks = dict:store(Pid, MFA, Callbacks) })};
-
-handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+ State = #fhc_state { clients = Clients }) ->
+ ok = track_client(Pid, Clients),
+ true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
+ {noreply, State};
+
+handle_cast({update, Pid, EldestUnusedSince},
+ State = #fhc_state { elders = Elders })
+ when EldestUnusedSince =/= undefined ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
{noreply, State #fhc_state { elders = Elders1 }};
handle_cast({close, Pid, EldestUnusedSince},
- State = #fhc_state { open_count = Count,
- counts = Counts,
- elders = Elders }) ->
+ State = #fhc_state { elders = Elders, clients = Clients }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- Counts1 = update_counts(open, Pid, -1, Counts),
- {noreply, process_pending(State #fhc_state { open_count = Count - 1,
- counts = Counts1,
- elders = Elders1 })};
+ ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
+ {noreply, process_pending(
+ update_counts(open, Pid, -1,
+ State #fhc_state { elders = Elders1 }))};
handle_cast({transfer, FromPid, ToPid}, State) ->
- State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State),
- Counts1 = update_counts(obtain, FromPid, -1, Counts),
- Counts2 = update_counts(obtain, ToPid, +1, Counts1),
- {noreply, process_pending(State1 #fhc_state { counts = Counts2 })};
+ ok = track_client(ToPid, State#fhc_state.clients),
+ {noreply, process_pending(
+ update_counts(obtain, ToPid, +1,
+ update_counts(obtain, FromPid, -1, State)))};
handle_cast(check_counts, State) ->
- {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
- {noreply, State1}.
-
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, State =
- #fhc_state { open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_pending = ObtainPending,
- callbacks = Callbacks,
- counts = Counts,
- elders = Elders }) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_pending = ObtainPending,
+ clients = Clients }) ->
FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end,
OpenPending1 = lists:filter(FilterFun, OpenPending),
ObtainPending1 = lists:filter(FilterFun, ObtainPending),
- {Opened, Obtained} = dict:fetch(Pid, Counts),
- {noreply, process_pending(State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = OpenPending1,
- obtain_count = ObtainCount - Obtained,
- obtain_pending = ObtainPending1,
- callbacks = dict:erase(Pid, Callbacks),
- counts = dict:erase(Pid, Counts),
- elders = dict:erase(Pid, Elders) })}.
-
-terminate(_Reason, State) ->
+ [#cstate { opened = Opened, obtained = Obtained }] =
+ ets:lookup(Clients, Pid),
+ true = ets:delete(Clients, Pid),
+ {noreply, process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = OpenPending1,
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = ObtainPending1,
+ elders = dict:erase(Pid, Elders) })}.
+
+terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+ ets:delete(Clients),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -857,19 +880,21 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
+obtain_limit(infinity) -> infinity;
+obtain_limit(Limit) -> ?OBTAIN_LIMIT(Limit).
+
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
- process_obtain(process_open(State)).
+ process_open(process_obtain(State)).
process_open(State = #fhc_state { limit = Limit,
open_pending = Pending,
open_count = OpenCount,
obtain_count = ObtainCount }) ->
- {Pending1, Inc, State1} =
+ {Pending1, State1} =
process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
- State1 #fhc_state { open_pending = Pending1,
- open_count = OpenCount + Inc }.
+ State1 #fhc_state { open_pending = Pending1 }.
process_obtain(State = #fhc_state { limit = Limit,
obtain_pending = Pending,
@@ -878,78 +903,132 @@ process_obtain(State = #fhc_state { limit = Limit,
open_count = OpenCount }) ->
Quota = lists:min([ObtainLimit - ObtainCount,
Limit - (ObtainCount + OpenCount)]),
- {Pending1, Inc, State1} = process_pending(Pending, Quota, State),
- State1 #fhc_state { obtain_pending = Pending1,
- obtain_count = ObtainCount + Inc }.
+ {Pending1, State1} = process_pending(Pending, Quota, State),
+ State1 #fhc_state { obtain_pending = Pending1 }.
process_pending([], _Quota, State) ->
- {[], 0, State};
+ {[], State};
process_pending(Pending, Quota, State) when Quota =< 0 ->
- {Pending, 0, State};
-process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) ->
+ {Pending, State};
+process_pending(Pending, Quota, State) ->
PendingLen = length(Pending),
SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
- Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev),
- {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}.
+ State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev),
+ {PendingNew, State1}.
-run_pending_item(Item, State = #fhc_state { counts = Counts }) ->
- State #fhc_state { counts = run_pending_item1(Item, Counts) }.
-
-run_pending_item1({Kind, Pid, From}, Counts) ->
+run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) ->
gen_server:reply(From, ok),
- update_counts(Kind, Pid, +1, Counts).
-
-update_counts(open, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end,
- Counts);
-update_counts(obtain, Pid, Delta, Counts) ->
- dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end,
- Counts).
-
-maybe_reduce(State = #fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_limit = ObtainLimit,
- obtain_pending = ObtainPending,
- elders = Elders,
- callbacks = Callbacks,
- timer_ref = TRef })
- when Limit =/= infinity andalso
- (((OpenCount + ObtainCount) > Limit) orelse
- (OpenPending =/= []) orelse
- (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
+ true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
+ update_counts(Kind, Pid, +1, State).
+
+update_counts(Kind, Pid, Delta,
+ State = #fhc_state { open_count = OpenCount,
+ obtain_count = ObtainCount,
+ clients = Clients }) ->
+ {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count = ObtainCount + ObtainDelta }.
+
+update_counts1(open, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
+ {Delta, 0};
+update_counts1(obtain, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
+ {0, Delta}.
+
+maybe_reduce(State) ->
+ case needs_reduce(State) of
+ true -> reduce(State);
+ false -> State
+ end.
+
+needs_reduce(#fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending }) ->
+ Limit =/= infinity
+ andalso ((OpenCount + ObtainCount > Limit)
+ orelse (OpenPending =/= [])
+ orelse (ObtainCount < ObtainLimit
+ andalso ObtainPending =/= [])).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ clients = Clients,
+ timer_ref = TRef }) ->
Now = now(),
- {Pids, Sum, ClientCount} =
- dict:fold(fun (_Pid, undefined, Accs) ->
- Accs;
- (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
- {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
- CountAcc + 1}
+ {CStates, Sum, ClientCount} =
+ dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ [#cstate { pending_closes = PendingCloses,
+ opened = Opened,
+ blocked = Blocked } = CState] =
+ ets:lookup(Clients, Pid),
+ case Blocked orelse PendingCloses =:= Opened of
+ true -> Accs;
+ false -> {[CState | CStatesAcc],
+ SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end
end, {[], 0, 0}, Elders),
- case Pids of
+ case CStates of
[] -> ok;
- _ -> AverageAge = Sum / ClientCount,
- lists:foreach(
- fun (Pid) ->
- case dict:find(Pid, Callbacks) of
- error -> ok;
- {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
- end
- end, Pids)
+ _ -> case (Sum / ClientCount) -
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ AverageAge when AverageAge > 0 ->
+ notify_age(CStates, AverageAge);
+ _ ->
+ notify_age0(Clients, CStates,
+ length(OpenPending) + length(ObtainPending))
+ end
end,
- AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
- _ -> {AboveLimit, State}
- end;
-maybe_reduce(State) ->
- {false, State}.
+ State #fhc_state { timer_ref = TRef1 };
+ _ -> State
+ end.
+
+notify_age(CStates, AverageAge) ->
+ lists:foreach(
+ fun (#cstate { callback = undefined }) -> ok;
+ (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge])
+ end, CStates).
+
+notify_age0(Clients, CStates, Required) ->
+ Notifications =
+ [CState || CState <- CStates, CState#cstate.callback =/= undefined],
+ {L1, L2} = lists:split(random:uniform(length(Notifications)),
+ Notifications),
+ notify(Clients, Required, L2 ++ L1).
+
+notify(_Clients, _Required, []) ->
+ ok;
+notify(_Clients, Required, _Notifications) when Required =< 0 ->
+ ok;
+notify(Clients, Required, [#cstate{ pid = Pid,
+ callback = {M, F, A},
+ opened = Opened } | Notifications]) ->
+ apply(M, F, A ++ [0]),
+ ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
+ notify(Clients, Required - Opened, Notifications).
+
+track_client(Pid, Clients) ->
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained = 0,
+ blocked = false,
+ pending_closes = 0 }) of
+ true -> _MRef = erlang:monitor(process, Pid),
+ ok;
+ false -> ok
+ end.
%% For all unices, assume ulimit exists. Further googling suggests
%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
@@ -980,11 +1059,3 @@ ulimit() ->
_ ->
?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
end.
-
-ensure_mref(Pid, State = #fhc_state { counts = Counts }) ->
- case dict:find(Pid, Counts) of
- {ok, _} -> State;
- error -> _MRef = erlang:monitor(process, Pid),
- State #fhc_state {
- counts = dict:store(Pid, {0, 0}, Counts) }
- end.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 89b2441e..e796acf3 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery =
Delivery).
split_topic_key(Key) ->
- re:split(Key, "\\.", [{return, list}]).
+ string:tokens(binary_to_list(Key), ".").
topic_matches(PatternKey, RoutingKey) ->
P = split_topic_key(PatternKey),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 5bc1f9d5..207ddcb8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid, Msg}), CState}.
+ {gen_server2:cast(Server, {write, Guid}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -611,7 +611,7 @@ handle_call({delete_client, CRef}, _From,
reply(ok,
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
-handle_cast({write, Guid, Msg},
+handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
@@ -619,6 +619,7 @@ handle_cast({write, Guid, Msg},
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
+ [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
case index_lookup(Guid, State) of
not_found ->
%% New message, lots to do
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c07055af..3c90fefa 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ ok = file_handle_cache:set_limit(10),
passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),