summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_decorator.erl
blob: adfe0c7faec0d6e79aa811ee295bd25baeb9b476 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
-module(rabbit_queue_decorator).

-include("rabbit.hrl").

-export([select/1, set/1, register/2, unregister/1]).

%%----------------------------------------------------------------------------

-ifdef(use_specs).

-callback startup(rabbit_types:amqqueue()) -> 'ok'.

-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.

-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
    'ok'.

-callback active_for(rabbit_types:amqqueue()) -> boolean().

%% called with Queue, MaxActivePriority, IsEmpty
-callback consumer_state_changed(
            rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.

-else.

-export([behaviour_info/1]).

behaviour_info(callbacks) ->
    [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
     {active_for, 1}, {consumer_state_changed, 3}];
behaviour_info(_Other) ->
    undefined.

-endif.

%%----------------------------------------------------------------------------

select(Modules) ->
    [M || M <- Modules, code:which(M) =/= non_existing].

set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.

list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].

register(TypeName, ModuleName) ->
    rabbit_registry:register(queue_decorator, TypeName, ModuleName),
    [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
    ok.

unregister(TypeName) ->
    rabbit_registry:unregister(queue_decorator, TypeName),
    [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
    ok.

maybe_recover(Q = #amqqueue{name       = Name,
                            decorators = Decs}) ->
    #amqqueue{decorators = Decs1} = set(Q),
    Old = lists:sort(select(Decs)),
    New = lists:sort(select(Decs1)),
    case New of
        Old -> ok;
        _   -> [M:startup(Q) || M <- New -- Old],
               rabbit_amqqueue:update_decorators(Name)
    end.