diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2016-03-15 15:10:45 -0700 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2016-03-15 15:10:45 -0700 |
commit | 18da7cf8eee3247a8b528bbf4dd700b45277a147 (patch) | |
tree | da54a9f6073fe9095c0e86d67511f0648ace00fd | |
parent | d1670d49c7c45dbbe7e23daa1bf02d1e39a75c2e (diff) | |
download | rvi_core-18da7cf8eee3247a8b528bbf4dd700b45277a147.tar.gz |
cache authorizations for better performance
-rw-r--r-- | components/authorize/src/authorize_keys.erl | 189 | ||||
-rw-r--r-- | components/authorize/src/authorize_rpc.erl | 193 | ||||
-rw-r--r-- | components/dlink_bt/src/bt_connection_manager.erl | 55 | ||||
-rw-r--r-- | components/dlink_sms/src/sms_connection_manager.erl | 53 | ||||
-rw-r--r-- | components/dlink_tcp/src/connection_manager.erl | 46 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_connmgr.erl | 42 |
6 files changed, 284 insertions, 294 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index c2368d5..b5ffd72 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -7,7 +7,7 @@ get_pub_key/1, provisioning_key/0, signed_public_key/2, - save_keys/2, + %% save_keys/2, save_cred/5]). -export([get_credentials/0, get_credentials/1]). @@ -18,6 +18,13 @@ -export([public_key_to_json/1, json_to_public_key/1]). +-export([cache_authorizations/1, + remove_cached_authorizations/1, + remove_cached_authorizations_for_conn/1, + update_authorization_cache/2]). + +-export([remove_connection/1]). + -export([self_signed_public_key/0]). % just temporary -export([strip_nl/1]). -export([pp_key/1, @@ -41,13 +48,6 @@ dev_cert, cred_dir}). -%% -record(cert, {id, -%% register = [], -%% invoke = [], -%% validity = [], -%% jwt, -%% cert}). - -record(cred, {id, right_to_receive = [], right_to_invoke = [], @@ -56,11 +56,8 @@ jwt, cred}). --record(key, {id, - key}). - -define(CREDS, authorize_creds). --define(KEYS, authorize_keys). +-define(CACHE, authorize_cache). public_key_to_json(#'RSAPublicKey'{modulus = N, publicExponent = E}) -> [ @@ -124,8 +121,16 @@ get_device_key() -> validate_message(JWT, Conn) -> gen_server:call(?MODULE, {validate_message, JWT, Conn}). -validate_service_call(Service, Conn) -> - gen_server:call(?MODULE, {validate_service_call, Service, Conn}). +validate_service_call(Service, Conn0) -> + Conn = normalize_conn(Conn0), + case ets:lookup(?CACHE, {Service, Conn}) of + [{_, Res}] -> + ?debug("cached validation (~p): ~p", [{Service, Conn}, Res]), + Res; + [] -> + ?debug("no cached validation (~p)", [{Service, Conn}]), + gen_server:call(?MODULE, {validate_service_call, Service, Conn}) + end. get_credentials() -> get_credentials(local). @@ -142,12 +147,24 @@ find_cred_by_service(Service) -> provisioning_key() -> gen_server:call(?MODULE, provisioning_key). -save_keys(Keys, Conn) -> - gen_server:call(?MODULE, {save_keys, Keys, Conn}). - save_cred(Cred, JWT, Conn, PeerCert, LogId) -> gen_server:call(?MODULE, {save_cred, Cred, JWT, Conn, PeerCert, LogId}). +cache_authorizations(Svcs) -> + gen_server:cast(?MODULE, {cache_authorizations, Svcs}). + +remove_cached_authorizations(Svcs) -> + gen_server:cast(?MODULE, {remove_cached_authorizations, Svcs}). + +remove_cached_authorizations_for_conn(Conn) -> + remove_cached_authorizations_for_conn_(normalize_conn(Conn)). + +update_authorization_cache(Conn, CS) -> + gen_server:cast(?MODULE, {update_authorization_cache, Conn, CS}). + +remove_connection(Conn) -> + gen_server:cast(?MODULE, {remove_connection, Conn}). + %% Gen_server functions start_link() -> @@ -155,7 +172,7 @@ start_link() -> case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of {ok, Pid} = Ok -> ets:give_away(?CREDS, Pid, undefined), - ets:give_away(?KEYS, Pid, undefined), + %% ets:give_away(?KEYS, Pid, undefined), Ok; Other -> Other @@ -191,16 +208,7 @@ handle_call_(provisioning_key, _, S) -> handle_call_({get_credentials, Conn}, _, S) -> Creds = creds_by_conn(normalize_conn(Conn)), {reply, Creds, S}; -handle_call_({save_keys, Keys, Conn0}, _, S) -> - Conn = normalize_conn(Conn0), - ?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]), - save_keys_(Keys, Conn), - {reply, ok, S}; -handle_call_({validate_message, JWT, Conn0}, _, S) -> - Conn = normalize_conn(Conn0), - {reply, validate_message_(JWT, Conn), S}; -handle_call_({validate_service_call, Svc, Conn0}, _, S) -> - Conn = normalize_conn(Conn0), +handle_call_({validate_service_call, Svc, Conn}, _, S) -> {reply, validate_service_call_(Svc, Conn), S}; handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn0, PeerCert, LogId}, _, S) -> Conn = normalize_conn(Conn0), @@ -228,6 +236,21 @@ handle_call_({find_cred_by_service, Service} = R, _From, State) -> handle_call_(_, _, S) -> {reply, error, S}. +handle_cast({cache_authorizations, Svcs}, S) -> + cache_authorizations_(Svcs), + {noreply, S}; +handle_cast({remove_cached_authorizations, Svcs}, S) -> + remove_cached_authorizations_(Svcs), + {noreply, S}; +handle_cast({update_authorization_cache, Conn0, CS}, S) -> + Conn = normalize_conn(Conn0), + update_authorization_cache_(Conn, CS), + {noreply, S}; +handle_cast({remove_connection, Conn0}, S) -> + Conn = normalize_conn(Conn0), + ets:select_delete(?CACHE, [{ {{'_', Conn}, '_'}, [], [true] }]), + ets:select_delete(?CREDS, [{ {{Conn, '_'}, '_'}, [], [true] }]), + {noreply, S}; handle_cast(_, S) -> {noreply, S}. @@ -449,7 +472,8 @@ get_pub_key_from_cert_rec(#'Certificate'{ create_ets() -> create_ets(?CREDS, 1), - create_ets(?KEYS, #key.id). + %% create_ets(?KEYS, #key.id), + create_ets(?CACHE, 1). create_ets(Tab, KeyPos) -> case ets:info(Tab, name) of @@ -589,56 +613,73 @@ check_validity({Start, Stop}, UTC) -> check_validity(Start, Stop, UTC) -> (UTC > Start) andalso (UTC < Stop). -save_keys_(Keys, Conn) -> - lists:foreach( - fun(K) -> - save_key(K, Conn) - end, Keys). +validate_service_call_(Svc, Conn) -> + Res = + case lists:filter(fun(C) -> + can_invoke(Svc, C) + end, cred_recs_by_conn(Conn)) of + [] -> + invalid; + [#cred{id = ID}|_] -> + {ok, ID} + end, + ets:insert(?CACHE, {{Svc, Conn}, Res}), + Res. -save_key(K, Conn) -> - case json_to_public_key(K) of - undefined -> - ?warning("Unknown key type: ~p~n", [K]), - skip; - #'RSAPublicKey'{} = PubKey -> - KeyID = - case rvi_common:get_json_element(["kid"], K) of - {ok, ID} -> {Conn, ID}; - _ -> {Conn, make_ref()} - end, - ?debug("Saving key ~p, PubKey = ~p~n", [KeyID, pp_key(PubKey)]), - ets:insert(?KEYS, #key{id = KeyID, key = PubKey}) - end. +cache_authorizations_(Svcs) -> + CacheEntries = ets:foldl( + fun(CEntry, Acc) -> + lists:foldr( + fun(Svc, Acc1) -> + cache_authorization_entry( + CEntry, Svc, Acc1) + end, Acc, Svcs) + end, [], ?CREDS), + ets:insert(?CACHE, CacheEntries), + ?debug("auth cache: ~p", [ets:tab2list(?CACHE)]), + ok. -keys_by_conn(Conn) -> - ?debug("keys_by_conn(~p); all keys: ~p", - [Conn, ets:select(?KEYS, [{ #key{id = '$1', _='_'}, - [], ['$1'] }])]), - ets:select(?KEYS, [{ #key{id = {Conn,'$1'}, - key = '$2', _='_'}, [], [{{'$1', '$2'}}] }]). +cache_authorization_entry(Entry, Svc, Acc) -> + ?debug("cache_authorization_entry(~p, ~p)", [Entry, Svc]), + case {Entry, Acc} of + {{{Conn, _}, _C}, [{{Svc, Conn}, {ok,_}}|_]} -> + Acc; + {{{Conn, ID}, C}, Acc} -> + case can_invoke(Svc, C) of + true -> + case Acc of + [{{Svc, Conn}, invalid}|Rest] -> + [{{Svc, Conn}, {ok, ID}}|Rest]; + _ -> + [{{Svc, Conn}, {ok, ID}}|Acc] + end; + false -> + case Acc of + [{{Svc, Conn}, invalid}|_] -> + Acc; + _ -> + [{{Svc, Conn}, invalid}|Acc] + end + end + end. -validate_message_(JWT, Conn) -> - ?debug("validate_message_(~p, ~p) -> ~p~n", [JWT, Conn, keys_by_conn(Conn)]), - [_|_] = Keys = keys_by_conn(Conn), - validate_message_1(Keys, JWT). +remove_cached_authorizations_(Svc) -> + ets:select_delete(?CACHE, [{ {{Svc,'_'},'_'}, [], [true] }]), + ok. -validate_message_1([{_,K}|T], JWT) -> - case authorize_sig:decode_jwt(JWT, K) of - invalid -> - validate_message_1(T, JWT); - {_, Msg} -> - Msg - end; -validate_message_1([], _) -> - error(invalid). +update_authorization_cache_(Conn, CS) -> + remove_cached_authorizations_for_conn_(Conn), + [ok, Svcs] = service_discovery_rpc:get_all_services(CS), + ?debug("update authorization cache for ~p; Svs = ~p", [Conn, Svcs]), + lists:foreach( + fun(Svc) -> + validate_service_call_(Svc, Conn) + end, Svcs), + ?debug("auth cache: ~p", [ets:tab2list(?CACHE)]). -validate_service_call_(Svc, Conn) -> - case lists:filter(fun(C) -> can_invoke(Svc, C) end, cred_recs_by_conn(Conn)) of - [] -> - invalid; - [#cred{id = ID}|_] -> - {ok, ID} - end. +remove_cached_authorizations_for_conn_(Conn) -> + ets:select_delete(?CACHE, [{ {{'_', Conn}, '_'}, [], [true] }]), + ok. can_invoke(Svc, #cred{right_to_invoke = In}) -> lists:any(fun(I) -> match_svc(I, Svc) end, In). diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index ba54f3a..9fa052a 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -21,14 +21,17 @@ -export([get_credentials/1, sign_message/2, validate_message/3, - validate_authorization/3, - validate_authorization/4, store_creds/3, store_creds/4, + remove_connection/2, authorize_local_message/3, authorize_remote_message/3]). -export([filter_by_service/3]). +%% for service_discovery notifications +-export([service_available/3, + service_unavailable/3]). + %% for testing & development -export([public_key/0, public_key_json/0, private_key/0]). @@ -53,6 +56,8 @@ init([]) -> {Priv, Pub} = authorize_keys:get_device_key(), ?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv), authorize_keys:pp_key(Pub)]), + CS = rvi_common:get_component_specification(), + service_discovery_rpc:subscribe(CS, ?MODULE), {ok, #st { cs = rvi_common:get_component_specification(), private_key = Priv, public_key = Pub} }. @@ -78,22 +83,9 @@ get_credentials(CompSpec) -> rvi_common:request(authorize, ?MODULE, get_credentials, [], [status, creds], CompSpec). -validate_authorization(CompSpec, JWT, Conn) -> - ?debug("authorize_rpc:validate_authorization():" - " Conn = ~p~n", [Conn]), - rvi_common:request(authorize, ?MODULE, validate_authorization, - [{jwt, JWT}, - {conn, Conn}], - [status], CompSpec). - -validate_authorization(CompSpec, JWT, Creds, Conn) -> - ?debug("authorize_rpc:validate_authorization():" - " Conn = ~p~n", [Conn]), - rvi_common:request(authorize, ?MODULE, validate_authorization, - [{jwt, JWT}, - {creds, Creds}, - {conn, Conn}], - [status], CompSpec). +remove_connection(CompSpec, Conn) -> + rvi_common:notification(authorize, ?MODULE, remove_connection, + [{conn, Conn}], [status], CompSpec). store_creds(CompSpec, Creds, Conn) -> store_creds(CompSpec, Creds, Conn, undefined). @@ -129,6 +121,14 @@ filter_by_service(CompSpec, Services, Conn) -> { conn, Conn }], [status, services], CompSpec). +service_available(CS, SvcName, _DLMod) -> + rvi_common:notification(authorize, ?MODULE, service_available, + [{service, SvcName}], CS). + +service_unavailable(CS, SvcName, _DLMod) -> + rvi_common:notification(authorize, ?MODULE, service_unavailable, + [{service, SvcName}], CS). + public_key() -> gen_server:call(?SERVER, public_key). @@ -140,7 +140,7 @@ private_key() -> %% JSON-RPC entry point %% CAlled by local exo http server -handle_rpc("sign_message", Args) -> +handle_rpc(<<"sign_message">>, Args) -> {ok, Message} = rvi_common:get_json_element(["message"], Args), LogId = rvi_common:get_json_log_id(Args), [ Status, JWT ] = @@ -148,7 +148,7 @@ handle_rpc("sign_message", Args) -> ?debug("Message signature = ~p~n", [JWT]), {ok, [ {status, rvi_common:json_rpc_status(Status)}, {jwt, JWT} ]}; -handle_rpc("validate_message", Args) -> +handle_rpc(<<"validate_message">>, Args) -> ?debug("validate_message; Args = ~p~n", [Args]), {ok, JWT} = rvi_common:get_json_element(["jwt"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), @@ -157,24 +157,12 @@ handle_rpc("validate_message", Args) -> gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }), {ok, [ {status, rvi_common:json_rpc_status(Status)}, {message, Msg} ]}; -handle_rpc("get_credentials", Args) -> +handle_rpc(<<"get_credentials">>, Args) -> LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = gen_server:call(?SERVER, { rvi, get_credentials, [LogId] }), {ok, [ rvi_common:json_rpc_status(Status) | Rem ] }; -handle_rpc("validate_authorization", Args) -> - {ok, JWT} = rvi_common:get_json_element(["jwt"], Args), - {ok, Conn} = rvi_common:get_json_element(["connection"], Args), - LogId = rvi_common:get_json_log_id(Args), - CmdArgs = - case rvi_common:get_json_element(["creds"], Args) of - {ok, Creds} -> [JWT, Creds, Conn, LogId]; - {error, _} -> [JWT, Conn, LogId] - end, - [ Status | Rem ] = - gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}), - {ok, [ rvi_common:json_rpc_status(Status) | Rem] }; -handle_rpc("store_creds", Args) -> +handle_rpc(<<"store_creds">>, Args) -> {ok, Creds} = rvi_common:get_json_element(["creds"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), {ok, PeerCert} = rvi_common:get_json_element(["peer_cert"], Args), @@ -182,7 +170,7 @@ handle_rpc("store_creds", Args) -> [ Status | Rem ] = gen_server:call(?SERVER, {rvi, store_creds, [Creds, Conn, PeerCert, LogId]}), {ok, [ rvi_common:json_rpc_status(Status) | Rem]}; -handle_rpc("authorize_local_message", Args) -> +handle_rpc(<<"authorize_local_message">>, Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), {ok, Params} = rvi_common:get_json_element(["parameters"], Args), LogId = rvi_common:get_json_log_id(Args), @@ -193,7 +181,7 @@ handle_rpc("authorize_local_message", Args) -> { ok, [ rvi_common:json_rpc_status(Status) | Rem] }; -handle_rpc("authorize_remote_message", Args) -> +handle_rpc(<<"authorize_remote_message">>, Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), {ok, Params} = rvi_common:get_json_element(["parameters"], Args), LogId = rvi_common:get_json_log_id(Args), @@ -201,7 +189,7 @@ handle_rpc("authorize_remote_message", Args) -> [Service, Params, LogId]}), { ok, rvi_common:json_rpc_status(Status)}; -handle_rpc("filter_by_service", Args) -> +handle_rpc(<<"filter_by_service">>, Args) -> ?debug("authorize_rpc:handle_rpc(\"filter_by_service\", ~p)~n", [Args]), {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), @@ -217,12 +205,24 @@ handle_rpc(Other, _Args) -> { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. +handle_notification(<<"service_available">>, Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + gen_server:cast(?SERVER, {service_available, SvcName}), + ok; +handle_notification(<<"service_unavailable">>, Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + gen_server:cast(?SERVER, {service_unavailable, SvcName}), + ok; +handle_notification(<<"remove_connection">>, Args) -> + {ok, Conn} = rvi_common:get_json_element(["conn"], Args), + gen_server:cast(?SERVER, {remove_connection, Conn}, Args), + ok; handle_notification(Other, _Args) -> ?debug("authorize_rpc:handle_other(~p): unknown", [ Other ]), ok. %% -%% Genserver implementation +%% Gen_server implementation %% handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) -> Sign = authorize_sig:encode_jwt(Msg, Key), @@ -241,62 +241,19 @@ handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) -> handle_call({rvi, get_credentials, _Args}, _From, State) -> {reply, [ ok, authorize_keys:get_credentials() ], State}; -handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) -> - %% The authorize JWT contains the public key used to sign the cred - ?debug( - "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", - []), - try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of - {_Header, Keys} -> - log(LogId, result, "auth jwt validated", []), - KeyStructs = get_json_element(["keys"], Keys, []), - authorize_keys:save_keys(KeyStructs, Conn), - {reply, [ok], State}; - invalid -> - ?warning("Invalid auth JWT from ~p~n", [Conn]), - log(LogId, error, "auth jwt INVALID", []), - {reply, [not_found], State} - catch - error:_Err -> - ?warning("Auth validation exception: ~p~n", [_Err]), - {reply, [not_found], State} - end; - -handle_call({rvi, validate_authorization, [JWT, Creds, Conn | [_] = LogId] }, _From, State) -> - %% The authorize JWT contains the public key used to sign the cred - ?debug( - "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", - []), - try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of - {_Header, Keys} -> - log(LogId, result, "auth jwt validated", []), - KeyStructs = get_json_element(["keys"], Keys, []), - ?debug("KeyStructs = ~p~n", [KeyStructs]), - authorize_keys:save_keys(KeyStructs, Conn), - do_store_creds(Creds, Conn, undefined, LogId), - {reply, [ok], State}; - invalid -> - ?warning("Invalid auth JWT from ~p~n", [Conn]), - log(LogId, error, "auth jwt INVALID", []), - {reply, [not_found], State} - catch - error:_Err -> - ?warning("Auth validation exception: ~p~n", [_Err]), - {reply, [not_found], State} - end; - handle_call({rvi, store_creds, [Creds, Conn, PeerCert | LogId]}, _From, State) -> - do_store_creds(Creds, Conn, PeerCert, LogId), + do_store_creds(Creds, Conn, PeerCert, LogId, State#st.cs), {reply, [ok], State}; + handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) -> ?debug("authorize_rpc:handle_call(~p)~n", [R]), - case authorize_keys:find_cred_by_service(Service) of - {ok, {ID, _Cred}} -> - log(LogId, result, "auth msg: Cred=~s", [authorize_keys:abbrev_bin(ID)]), - {reply, [ok], State}; - _ -> - log(LogId, error, "NO CREDS for ~s", [Service]), - {reply, [ not_found ], State} + case authorize_keys:validate_service_call(Service, local) of + invalid -> + log(LogId, error, "local msg REJECTED", []), + {reply, [ not_found ], State}; + {ok, Id} -> + log(LogId, result, "local msg allowed: Cred=~s", [Id]), + {reply, [ok], State} end; handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]}, @@ -341,6 +298,15 @@ handle_call(Other, _From, State) -> ?warning("authorize_rpc:handle_call(~p): unknown", [ Other ]), { reply, unknown_command, State}. +handle_cast({rvi, service_available, Svc}, State) -> + authorize_keys:cache_authorizations(Svc), + {noreply, State}; +handle_cast({rvi, service_unavailable, Svc}, State) -> + authorize_keys:remove_cached_authorizations(Svc), + {noreply, State}; +handle_cast({rvi, remove_connection, Conn}, State) -> + authorize_keys:remove_connection(Conn), + {noreply, State}; handle_cast(Other, State) -> ?warning("authorize_rpc:handle_cast(~p): unknown", [ Other ]), {noreply, State}. @@ -353,20 +319,14 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -do_store_creds(Creds, Conn, PeerCert, LogId) -> +do_store_creds(Creds, Conn, PeerCert, LogId, CS) -> ?debug("Storing ~p creds for conn ~p~nPeerCert = ~w", [length(Creds), Conn, authorize_keys:abbrev(PeerCert)]), + authorize_keys:remove_cached_authorizations_for_conn(Conn), lists:foreach(fun(Cred) -> store_cred(Cred, Conn, PeerCert, LogId) - end, Creds). - -get_json_element(Path, JSON, Default) -> - case rvi_common:get_json_element(Path, JSON) of - {ok, Value} -> - Value; - _ -> - Default - end. + end, Creds), + authorize_keys:update_authorization_cache(Conn, CS). store_cred(CredJWT, Conn, PeerCert, LogId) -> case authorize_sig:decode_jwt(authorize_keys:strip_nl(CredJWT), authorize_keys:provisioning_key()) of @@ -389,36 +349,3 @@ log([ID], Lvl, Fmt, Args) -> rvi_log:log(ID, Lvl, <<"authorize">>, rvi_log:format(Fmt, Args)); log(_, _, _, _) -> ok. - -%% check_msg(Checks, Params) -> -%% check_msg(Checks, Params, []). - - %% {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg), - %% {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg), - %% {ok, Params1} = rvi_common:get_json_element(["parameters"], Msg), - %% ?debug("authorize_rpc:authorize_remote_message(): timeout1: ~p~n", [Timeout1]), - %% ?debug("authorize_rpc:authorize_remote_message(): service_name1: ~p~n", [SvcName1]), - %% ?debug("authorize_rpc:authorize_remote_message(): parameters1: ~p~n", [Params1]), - - %% if Timeout =:= Timeout1 * 1000, - %% SvcName =:= SvcName1, - %% Parameters =:= Params1 -> - %% ?debug("Remote message authorized.~n", []), - %% {reply, [ ok ], State}; - %% true -> - %% ?debug("Remote message NOT authorized.~n", []), - %% {reply, [ not_found ], State} - %% end - %% end; - -%% check_msg([], _, []) -> -%% ok; -%% check_msg([{Key, Expect}|T], Msg, Acc) -> -%% case rvi_common:get_json_element([Key], Msg) of -%% {ok, Expect} -> -%% check_msg(T, Msg, Acc); -%% _ -> -%% check_msg(T, Msg, [Key|Acc]) -%% end; -%% check_msg([], _, [_|_] = Acc) -> -%% {error, {mismatch, lists:reverse(Acc)}}. diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl index e86dda3..23a6609 100644 --- a/components/dlink_bt/src/bt_connection_manager.erl +++ b/components/dlink_bt/src/bt_connection_manager.erl @@ -35,6 +35,7 @@ -define(SERVER, ?MODULE). -record(st, { + cs, conn_by_pid = undefined, conn_by_addr = undefined }). @@ -87,6 +88,7 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> {ok, #st{ + cs = rvi_common:get_component_specification(), conn_by_pid = dict:new(), %% All managed connection stored by pid conn_by_addr = dict:new() %% All managed connection stored by address }}. @@ -114,36 +116,15 @@ handle_call({add_connection, BTAddr, Channel, Pid}, _From, %% Store so that we can find connection both by pid and by address NConPid = dict:store(Pid, { BTAddr, Channel }, ConPid), NConBTAddr = dict:store({ BTAddr, Channel }, Pid, ConBTAddr), - + erlang:monitor(process, Pid), NSt = St#st { conn_by_pid = NConPid, conn_by_addr = NConBTAddr }, {reply, ok, NSt}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConBTAddr} = St) when is_pid(Pid)-> - - %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", - [ ?MODULE, Pid]), - { reply, not_found, St}; - - {ok, BTAddr } -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, BTAddress: ~p", - [ ?MODULE, Pid, BTAddr]), - - NConPid = dict:erase(Pid, ConPid), - NConBTAddr = dict:erase(BTAddr, ConBTAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConBTAddr }, - - {reply, ok, NSt} - end; - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> + {Res, NSt} = delete_connection_by_pid_(Pid, St), + {reply, Res, NSt}; %% Delete connection by address handle_call({ delete_connection_by_address, BTAddr, Channel}, _From, @@ -233,6 +214,9 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({'DOWN', _Ref, process, Pid, _}, St) -> + {_, NSt} = delete_connection_by_pid_(Pid, St), + {noreply, NSt}; handle_info(_Info, State) -> ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), {noreply, State}. @@ -265,3 +249,24 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +delete_connection_by_pid_(Pid, #st{conn_by_pid = ConPid, + conn_by_addr = ConBTAddr, + cs = CS} = St) -> + %% Find address associated with Pid + case dict:find(Pid, ConPid) of + error -> + ?debug("del_by_pid: not found: ~p", [Pid]), + {not_found, St}; + + {ok, BTAddr } -> + ?debug("del_by_pid: deleted Pid: ~p, BTAddress: ~p", [Pid, BTAddr]), + + NConPid = dict:erase(Pid, ConPid), + NConBTAddr = dict:erase(BTAddr, ConBTAddr), + + NSt = St#st { conn_by_pid = NConPid, + conn_by_addr = NConBTAddr }, + authorize_rpc:remove_connection(CS, BTAddr), + {ok, NSt} + end. diff --git a/components/dlink_sms/src/sms_connection_manager.erl b/components/dlink_sms/src/sms_connection_manager.erl index 0e59b0b..df84989 100644 --- a/components/dlink_sms/src/sms_connection_manager.erl +++ b/components/dlink_sms/src/sms_connection_manager.erl @@ -37,6 +37,7 @@ -define(SERVER, ?MODULE). -record(st, { + cs, conn_by_pid = undefined, conn_by_addr = undefined }). @@ -88,6 +89,7 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> {ok, #st{ + cs = rvi_common:get_component_specification(), conn_by_pid = dict:new(), %% All managed connection stored by pid conn_by_addr = dict:new() %% All managed connection stored by address }}. @@ -116,34 +118,18 @@ handle_call({add_connection, Addr, Pid}, _From, NConAddr = dict:store(Addr, Pid, ConAddr), NSt = St#st {conn_by_pid = NConPid, conn_by_addr = NConAddr}, + erlang:monitor(process, Pid), {reply, ok, NSt}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st{conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) when is_pid(Pid)-> - %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", - [?MODULE, Pid]), - {reply, not_found, St}; - - {ok, Addr} -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", - [?MODULE, Pid, Addr]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase(Addr, ConAddr), - - NSt = St#st{conn_by_pid = NConPid, - conn_by_addr = NConAddr}, - {reply, ok, NSt} - end; - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid) -> + {Res, NSt} = delete_connection_by_pid_(Pid, St), + {reply, Res, NSt}; %% Delete connection by address handle_call({delete_connection_by_address, Addr}, _From, - #st{conn_by_pid = ConPid, + #st{cs = CS, + conn_by_pid = ConPid, conn_by_addr = ConAddr} = St) -> %% Find Pid associated with Address @@ -158,6 +144,7 @@ handle_call({delete_connection_by_address, Addr}, _From, [?MODULE, Pid, Addr]), NConPid = dict:erase(Pid, ConPid), NConAddr = dict:erase(Addr, ConAddr), + authorize_rpc:remove_connection(Addr, CS), NSt = St#st {conn_by_pid = NConPid, conn_by_addr = NConAddr}, {reply, ok, NSt} @@ -225,6 +212,9 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({'DOWN', _Ref, process, Pid, _}, St) -> + {_, NSt} = delete_connection_by_pid_(Pid, St), + {noreply, NSt}; handle_info(_Info, State) -> ?warning("~p:handle_info(): Unknown info: ~p", [?MODULE, _Info]), {noreply, State}. @@ -257,3 +247,22 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +delete_connection_by_pid_(Pid, #st{cs = CS, + conn_by_pid = ConPid, + conn_by_addr = ConAddr} = St) -> + %% Find address associated with Pid + case dict:find(Pid, ConPid) of + error -> + ?debug("del_by_pid: not found: ~p", [Pid]), + {not_found, St}; + + {ok, Addr} -> + ?debug("del_by_pid: deleted Pid: ~p, Address: ~p", [Pid, Addr]), + NConPid = dict:erase(Pid, ConPid), + NConAddr = dict:erase(Addr, ConAddr), + authorize_rpc:remove_connection(Addr, CS), + NSt = St#st{conn_by_pid = NConPid, + conn_by_addr = NConAddr}, + {ok, NSt} + end. diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl index e16f789..f279578 100644 --- a/components/dlink_tcp/src/connection_manager.erl +++ b/components/dlink_tcp/src/connection_manager.erl @@ -39,8 +39,7 @@ -define(ADDR_TAB, dlink_tcp_conn_by_addr). -record(st, { - conn_by_pid = undefined, - conn_by_addr = undefined + cs }). %%%=================================================================== @@ -93,8 +92,7 @@ start_link() -> %%-------------------------------------------------------------------- init([]) -> {ok, #st{ - conn_by_pid = dict:new(), %% All managed connection stored by pid - conn_by_addr = dict:new() %% All managed connection stored by address + cs = rvi_common:get_component_specification() }}. create_ets() -> @@ -127,28 +125,16 @@ handle_call({add_connection, IP, Port, Pid}, _From, St) -> ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", [ ?MODULE, Pid, { IP, Port }]), %% Store so that we can find connection both by pid and by address + erlang:monitor(process, Pid), ets_insert(?PID_TAB, {Pid, {IP, Port}}), ets_insert(?ADDR_TAB, {{IP, Port}, Pid}), {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> - %% Find address associated with Pid - case ets_lookup(?PID_TAB, Pid) of - [] -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", - [ ?MODULE, Pid]), - { reply, not_found, St}; - - [{_, Addr}] -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, Addr]), - - ets_delete(?PID_TAB, Pid), - ets_delete(?ADDR_TAB, Addr), - {reply, ok, St} - end; - +handle_call({delete_connection_by_pid, Pid}, _From, #st{cs = CS} = St) + when is_pid(Pid)-> + Res = delete_connection_by_pid_(Pid, CS), + {reply, Res, St}; %% Delete connection by address handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> @@ -231,6 +217,9 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({'DOWN', _Ref, process, Pid, _}, #st{cs = CS} = St) -> + delete_connection_by_pid_(Pid, CS), + {noreply, St}; handle_info(_Info, State) -> ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), {noreply, State}. @@ -276,3 +265,18 @@ ets_delete(Tab, Key) -> ets_select(Tab, Pattern) -> ets:select(Tab, Pattern). + +delete_connection_by_pid_(Pid, CS) -> + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("del_by_pid: not found: ~p", + [ Pid]), + not_found; + [{_, Addr}] -> + ?debug("del_by_pid: deleted Pid: ~p, Address: ~p", + [ Pid, Addr]), + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + authorize_rpc:remove_connection(Pid, CS), + ok + end. diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl index 31e51bd..d27647d 100644 --- a/components/dlink_tls/src/dlink_tls_connmgr.erl +++ b/components/dlink_tls/src/dlink_tls_connmgr.erl @@ -38,7 +38,7 @@ -define(PID_TAB, dlink_tls_pid_tab). -define(ADDR_TAB, dlink_tls_addr_tab). --record(st, {}). +-record(st, {cs}). %%%=================================================================== %%% API @@ -101,7 +101,7 @@ maybe_create(Tab) -> %% @end %%-------------------------------------------------------------------- init([]) -> - {ok, #st{}}. + {ok, #st{cs = rvi_common:get_component_specification()}}. %%-------------------------------------------------------------------- %% @private @@ -122,28 +122,16 @@ handle_call({add_connection, IP, Port, Pid}, _From, St) -> ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", [ ?MODULE, Pid, Addr]), %% Store so that we can find connection both by pid and by address + erlang:monitor(process, Pid), ets_insert(?PID_TAB, {Pid, Addr}), ets_insert(?ADDR_TAB, {Addr, Pid}), {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid) -> - %% Find address associated with Pid - case ets_lookup(?PID_TAB, Pid) of - [] -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", - [ ?MODULE, Pid]), - { reply, not_found, St}; - - [{_, Addr}] -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, Addr]), - - ets_delete(?PID_TAB, Pid), - ets_delete(?ADDR_TAB, Addr), - {reply, ok, St} - end; - +handle_call({delete_connection_by_pid, Pid}, _From, #st{cs = CS} = St) + when is_pid(Pid) -> + Res = delete_connection_by_pid_(Pid, CS), + {reply, Res, St}; %% Delete connection by address handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> @@ -224,6 +212,9 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_info({'DOWN', _Ref, process, Pid, _}, #st{cs = CS} = State) -> + delete_connection_by_pid_(Pid, CS), + {noreply, State}; handle_info(_Info, State) -> ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), {noreply, State}. @@ -268,3 +259,16 @@ ets_delete(Tab, Key) -> ets_select(Tab, Pat) -> ets:select(Tab, Pat). + +delete_connection_by_pid_(Pid, CS) -> + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("del_by_pid: not found: ~p", [ Pid]), + not_found; + [{_, Addr}] -> + ?debug("del_by_pid: deleted Pid: ~p, Address: ~p", [Pid, Addr]), + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + authorize_rpc:remove_connection(CS, Addr), + ok + end. |