diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-12-13 11:59:25 -0800 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-12-13 11:59:25 -0800 |
commit | 88a73d73db0e06818b27ec4e6caeb2099be4e2e0 (patch) | |
tree | 206434dc830eb15bbf3d1687e149293237b8a768 /components | |
parent | 2572d8c0c0ea55b55f4225005bcf706084bd456d (diff) | |
download | rvi_core-88a73d73db0e06818b27ec4e6caeb2099be4e2e0.tar.gz |
More robust listeners, lots of bugfixes for (and in) test suite
Diffstat (limited to 'components')
-rw-r--r-- | components/authorize/src/authorize_keys.erl | 22 | ||||
-rw-r--r-- | components/dlink_bt/src/bt_connection.erl | 12 | ||||
-rw-r--r-- | components/dlink_bt/src/bt_listener.erl | 2 | ||||
-rw-r--r-- | components/dlink_bt/src/dlink_bt_rpc.erl | 14 | ||||
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 2 | ||||
-rw-r--r-- | components/dlink_tcp/src/connection_manager.erl | 145 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 10 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_sup.erl | 5 | ||||
-rw-r--r-- | components/dlink_tcp/src/gen_nb_server.erl | 3 | ||||
-rw-r--r-- | components/dlink_tcp/src/listener.erl | 53 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_conn.erl | 53 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_connmgr.erl | 132 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_listener.erl | 57 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_rpc.erl | 30 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_sup.erl | 2 | ||||
-rw-r--r-- | components/proto_msgpack/src/proto_msgpack_rpc.erl | 15 | ||||
-rw-r--r-- | components/rvi_common/src/rvi_log.erl | 2 | ||||
-rw-r--r-- | components/service_edge/src/wse_server.erl | 30 |
18 files changed, 362 insertions, 227 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index a362c7b..f651a60 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -197,14 +197,16 @@ handle_call_({validate_message, JWT, Conn}, _, S) -> {reply, validate_message_(JWT, Conn), S}; handle_call_({validate_service_call, Svc, Conn}, _, S) -> {reply, validate_service_call_(Svc, Conn), S}; -handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) -> +handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn0, PeerCert, LogId}, _, S) -> + Conn = normalize_conn(Conn0), + ?debug("save_cred: ~p (Conn=~p, PeerCert=~p)", [Cred, Conn, abbrev(PeerCert)]), case process_cred_struct(Cred, JWT, PeerCert) of invalid -> log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]), {reply, {error, invalid}, S}; #cred{} = C -> ets:insert(?CREDS, {{Conn, C#cred.id}, C}), - log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]), + log(LogId, result, "cred stored ~s Conn=~p", [abbrev_bin(C#cred.id), Conn]), {reply, ok, S} end; handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> @@ -244,14 +246,26 @@ creds_by_conn(Conn) -> ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]), [C || {C,V} <- Creds, check_validity(V, UTC)]. -cred_recs_by_conn(Conn) -> - ?debug("cred_recs_by_conn(~p)~n", [Conn]), +cred_recs_by_conn(Conn0) -> + Conn = normalize_conn(Conn0), + ?debug("cred_recs_by_conn(~p)~nAll = ~p", [Conn, abbrev(ets:tab2list(?CREDS))]), UTC = rvi_common:utc_timestamp(), Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'}, [], ['$1'] }]), ?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]), [C || C <- Creds, check_validity(C#cred.validity, UTC)]. +normalize_conn(local) -> + local; +normalize_conn({IP, Port} = Conn) when is_binary(IP), is_binary(Port) -> + Conn; +normalize_conn({IP, Port}) -> + {to_bin(IP), to_bin(Port)}. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> iolist_to_binary(L); +to_bin(I) when is_integer(I) -> integer_to_binary(I). + filter_by_service_(Services, Conn) -> ?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]), Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1', diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl index 6b3a64e..bcfa199 100644 --- a/components/dlink_bt/src/bt_connection.erl +++ b/components/dlink_bt/src/bt_connection.erl @@ -138,7 +138,7 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) -> {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS), PktSt = PktMod:init(CS), {ok, #st{ - remote_addr = BTAddr, + remote_addr = bt_addr(Mode, BTAddr), channel = Channel, rfcomm_ref = undefined, mode = Mode, @@ -394,9 +394,10 @@ handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod, inet_db:register_socket(Sock, inet_tcp), inet:setopts(Sock, [{active, once}]), {ok, {BTAddr, Channel}} = inet:peername(Sock), + ?debug("peername (tcp): ~p:~p", [BTAddr, Channel]), Mod:Fun(self(), BTAddr, Channel, accepted, Arg), {noreply, St#st{rfcomm_ref = Sock, - remote_addr = BTAddr}}; + remote_addr = bt_addr(tcp, BTAddr)}}; handle_info(_Info, State) -> ?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]), @@ -442,3 +443,10 @@ handle_elements(Elements, #st{remote_addr = BTAddr, ?debug("data complete; processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg). + + +bt_addr(tcp, Addr) -> + {ok, IP} = inet:ip(Addr), + inet_parse:ntoa(IP); +bt_addr(bt, Addr) -> + Addr. diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl index a1f1a49..06efd3d 100644 --- a/components/dlink_bt/src/bt_listener.erl +++ b/components/dlink_bt/src/bt_listener.erl @@ -42,7 +42,7 @@ accept_ack(Result, LRef, Addr, Chan) -> ok. sock_opts() -> - [binary, {active, once}, {packet, 0}]. + [{reuseaddr, true}, binary, {active, once}, {packet, 0}]. init(Mode) -> diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index 69310a7..c675387 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -651,11 +651,19 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> - {ok,[{address, Address }]} = bt_drv:local_info([address]), + {Address, Channel} = + case Mode = get_mode(CompSpec) of + bt -> + {ok,[{address, Addr}]} = bt_drv:local_info([address]), + {bt_address_to_string(Addr), SetupChannel}; + tcp -> + {IP, Port} = rvi_common:node_address_tuple(), + {IP, integer_to_binary(Port)} + end, bt_connection:send(Pid, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, - { ?DLINK_ARG_PORT, SetupChannel }, + { ?DLINK_ARG_ADDRESS, Address }, + { ?DLINK_ARG_PORT, Channel }, { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } | log_id_tail(CompSpec)]). diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index b24215c..7229b3c 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -231,7 +231,7 @@ handle_info({tcp, Sock, Data}, port = Port, packet_mod = PMod, packet_st = PSt} = State) -> - ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), + ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]), case PMod:decode(Data, fun(Elems) -> handle_elements(Elems, State) end, PSt) of diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl index 6a9f1e0..e16f789 100644 --- a/components/dlink_tcp/src/connection_manager.erl +++ b/components/dlink_tcp/src/connection_manager.erl @@ -2,10 +2,10 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% -%% +%% %%%------------------------------------------------------------------- %%% @author magnus <magnus@t520.home> %%% @copyright (C) 2014, magnus @@ -33,7 +33,10 @@ -export([find_connection_by_address/2]). -export([connections/0]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). + +-define(PID_TAB, dlink_tcp_conn_by_pid). +-define(ADDR_TAB, dlink_tcp_conn_by_addr). -record(st, { conn_by_pid = undefined, @@ -70,6 +73,7 @@ connections() -> %% @end %%-------------------------------------------------------------------- start_link() -> + create_ets(), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== @@ -93,6 +97,18 @@ init([]) -> conn_by_addr = dict:new() %% All managed connection stored by address }}. +create_ets() -> + maybe_create(?PID_TAB), + maybe_create(?ADDR_TAB). + +maybe_create(Tab) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, [public, named_table, set]); + _ -> + Tab + end. + %%-------------------------------------------------------------------- %% @private %% @doc @@ -107,106 +123,84 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call({add_connection, IP, Port, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - - ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", +handle_call({add_connection, IP, Port, Pid}, _From, St) -> + ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", [ ?MODULE, Pid, { IP, Port }]), %% Store so that we can find connection both by pid and by address - NConPid = dict:store(Pid, { IP, Port }, ConPid), - NConAddr = dict:store({ IP, Port }, Pid, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt}; + ets_insert(?PID_TAB, {Pid, {IP, Port}}), + ets_insert(?ADDR_TAB, {{IP, Port}, Pid}), + {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) when is_pid(Pid)-> - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(del_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - - {ok, Addr } -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, Addr]), - - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase(Addr, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, + [{_, Addr}] -> + ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, Addr]), - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Delete connection by address -handle_call({ delete_connection_by_address, IP, Port}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> %% Find Pid associated with Address - case dict:find({IP, Port}, ConAddr) of - error -> - ?debug("~p:handle_call(del_by_addr): not found: ~p", - [ ?MODULE, {IP, Port}]), + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> + ?debug("~p:handle_call(del_by_addr): not found: ~p", + [ ?MODULE, Addr]), { reply, not_found, St}; - - {ok, Pid } -> - ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, {IP, Port}]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase({ IP, Port }, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt} - end; + [{_, Pid}] -> + ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, Addr]), + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} + end; %% Find connection by pid -handle_call({ find_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> - +handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(find_by_pid): not found: ~p", + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(find_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - - {ok, {IP, Port} } -> - ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", + + [{_, {IP, Port}}] -> + ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", [ ?MODULE, Pid, {IP, Port}]), {reply, {ok, IP, Port}, St} end; %% Find connection by address -handle_call({find_connection_by_address, IP, Port}, _From, - #st { conn_by_addr = ConAddr} = St) -> - +handle_call({find_connection_by_address, IP, Port}, _From, St) -> %% Find address associated with Pid - case dict:find({IP, Port}, ConAddr) of - error -> - ?debug("~p:handle_call(find_by_addr): not found: ~p", + case ets_lookup(?ADDR_TAB, {IP, Port}) of + [] -> + ?debug("~p:handle_call(find_by_addr): not found: ~p", [ ?MODULE, {IP, Port}]), { reply, not_found, St}; - - {ok, Pid } -> - ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", + + [{_, Pid}] -> + ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", [ ?MODULE, {IP, Port}, Pid]), {reply, {ok, Pid}, St} end; -handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> - {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; +handle_call(connections, _From, St) -> + {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1']}]), St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), @@ -269,3 +263,16 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% Ets wrapper functions to simplify tracing. +ets_lookup(Tab, Key) -> + ets:lookup(Tab, Key). + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pattern) -> + ets:select(Tab, Pattern). diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 83e0a24..68d32d7 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -94,11 +94,11 @@ start_json_server() -> start_connection_manager() -> %% Fire up listener CompSpec = rvi_common:get_component_specification(), - connection_manager:start_link(), + %% connection_manager:start_link(), ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]), - {ok,Pid} = listener:start_link(), + %% {ok,Pid} = listener:start_link(), %% - setup_initial_listeners(Pid, CompSpec), + setup_initial_listeners(CompSpec), ?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."), @@ -112,7 +112,7 @@ start_connection_manager() -> ok. -setup_initial_listeners(Pid, CompSpec) -> +setup_initial_listeners(CompSpec) -> case rvi_common:get_module_config(data_link, ?MODULE, ?SERVER_OPTS, @@ -123,7 +123,7 @@ setup_initial_listeners(Pid, CompSpec) -> ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), %% %% Add listener port. - case listener:add_listener(Pid, IP, Port, CompSpec) of + case listener:add_listener(IP, Port, CompSpec) of ok -> ?notice("---- RVI Node External Address: ~s", [ application:get_env(rvi_core, node_address, undefined)]); diff --git a/components/dlink_tcp/src/dlink_tcp_sup.erl b/components/dlink_tcp/src/dlink_tcp_sup.erl index edb9c82..04f5255 100644 --- a/components/dlink_tcp/src/dlink_tcp_sup.erl +++ b/components/dlink_tcp/src/dlink_tcp_sup.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -34,6 +34,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ + ?CHILD(connection_manager, worker), + ?CHILD(listener, worker), ?CHILD(dlink_tcp_rpc, worker) ]} }. - diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl index af72189..ae0a605 100644 --- a/components/dlink_tcp/src/gen_nb_server.erl +++ b/components/dlink_tcp/src/gen_nb_server.erl @@ -43,6 +43,7 @@ terminate/2, code_change/3]). +-include_lib("lager/include/log.hrl"). -define(SERVER, ?MODULE). -record(state, {cb, @@ -203,11 +204,13 @@ code_change(_OldVsn, State, _Extra) -> %% Result = {ok, port()} | {error, any()} listen_on(CallbackModule, IpAddr, Port) -> SockOpts = [{reuseaddr, true}, {ip, convert(IpAddr)}] ++ CallbackModule:sock_opts(), + ?debug("listen on ~p:~p, Opts = ~p", [IpAddr, Port, SockOpts]), case gen_tcp:listen(Port, SockOpts) of {ok, LSock} -> {ok, _Ref} = prim_inet:async_accept(LSock, -1), {ok, LSock}; Err -> + ?debug("listen error: ~p", [Err]), Err end. diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 4512a59..6def59a 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -13,29 +13,56 @@ -include_lib("lager/include/log.hrl"). -export([start_link/0, - add_listener/4, - remove_listener/3]). + add_listener/3, + remove_listener/2]). -export([init/2, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2, sock_opts/0, new_connection/4]). -behavior(gen_nb_server). +-define(TAB, dlink_tcp_listener_tab). + start_link() -> - gen_nb_server:start_link(?MODULE, []). + create_ets(), + gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []). + +create_ets() -> + case ets:info(?TAB, name) of + undefined -> ets:new(?TAB, [public, named_table, set]); + _ -> ?TAB + end. -add_listener(Pid, IpAddr, Port, CompSpec) -> - gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}). +add_listener(IpAddr, Port, CompSpec) -> + gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}). -remove_listener(Pid, IpAddr, Port) -> - gen_server:call(Pid, {remove_listener, IpAddr, Port}). +remove_listener(IpAddr, Port) -> + gen_server:call(?MODULE, {remove_listener, IpAddr, Port}). init([], State) -> - {ok, State}. + case ets_select(?TAB, [{ '_', [], ['$_'] }]) of + [] -> + {ok, State}; + Addrs -> + lists:foldl( + fun({{_, _} = Addr}, Acc) -> + case gen_nb_server:add_listen_socket(Addr, Acc) of + {ok, Acc1} -> + Acc1; + _Error -> + ets_delete(?TAB, Addr), + Acc + end; + ({cs, CS}, Acc) -> + gen_nb_server:store_cb_state(CS, Acc) + end, State, Addrs) + end. handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> + ets_insert(?TAB, {cs, CompSpec}), case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_insert(?TAB, {{IpAddr,Port}}), {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; Error -> @@ -45,6 +72,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> handle_call({remove_listener, IpAddr, Port}, _From, State) -> case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_delete(?TAB, {IpAddr, Port}), {reply, ok, State1}; Error -> {reply, Error, State} @@ -77,3 +105,12 @@ new_connection(IP, Port, Sock, State) -> dlink_tcp_rpc, handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index 93266b1..be55dc6 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -28,7 +28,8 @@ terminate/2, code_change/3]). -export([setup/6]). --export([upgrade/3]). +-export([upgrade/3, + async_upgrade/3]). -export([send/2]). -export([send/3]). -export([is_connection_up/1]). @@ -77,6 +78,10 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) -> upgrade(Pid, Role, CompSpec) when Role==client; Role==server -> gen_server:call(Pid, {upgrade, Role, CompSpec}). +async_upgrade(Pid, Role, CompSpec) when Role==client; + Role==server -> + gen_server:cast(Pid, {upgrade, Role, CompSpec}). + send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). @@ -186,26 +191,9 @@ handle_call(terminate_connection, _From, St) -> {stop, Reason, ok, NSt}; handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) -> ?debug("~p:handle_call(~p)~n", [?MODULE, Req]), - - {ok, [{active, Last}]} = inet:getopts(S, [active]), - inet:setopts(S, [{active, false}]), - case do_upgrade(S, Role, CompSpec) of - {ok, NewS} -> - ?debug("upgrade to TLS succcessful~n", []), - ssl:setopts(NewS, [{active, Last}]), - {ok, {IP, Port}} = ssl:peername(NewS), - {ok, PeerCert} = ssl:peercert(NewS), - ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), - NewCS = rvi_common:set_value( - dlink_tls_role, Role, - rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)), - {reply, ok, St#st{sock = NewS, mode = tls, role = Role, - ip = inet_parse:ntoa(IP), port = Port, - cs = NewCS}}; - Error -> - ?error("Cannot upgrade to TLS: ~p~n", [Error]), - {stop, Error, Error, St} - end; + %% deliberately crash (for now) if upgrade fails. + {Reply, St1} = handle_upgrade(Role, CompSpec, St), + {reply, Reply, St1}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), Reply = ok, @@ -221,6 +209,9 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_cast({upgrade, Role, CompSpec}, St) -> + {_, St1} = handle_upgrade(Role, CompSpec, St), + {noreply, St1}; handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_call(send): Sending: ~p", [ ?MODULE, abbrev(Data)]), @@ -350,6 +341,26 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +handle_upgrade(Role, CompSpec, #st{sock = S} = St) -> + {ok, [{active, Last}]} = inet:getopts(S, [active]), + inet:setopts(S, [{active, false}]), + case do_upgrade(S, Role, CompSpec) of + {ok, NewS} -> + ?debug("upgrade to TLS succcessful~n", []), + ssl:setopts(NewS, [{active, Last}]), + {ok, {IP, Port}} = ssl:peername(NewS), + {ok, PeerCert} = ssl:peercert(NewS), + ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), + NewCS = rvi_common:set_value( + dlink_tls_role, Role, + rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)), + {ok, St#st{sock = NewS, mode = tls, role = Role, + ip = inet_parse:ntoa(IP), port = Port, + cs = NewCS}}; + Error -> + ?error("Cannot upgrade to TLS: ~p~n", [Error]), + error({cannot_upgrade, Error}) + end. do_upgrade(Sock, client, CompSpec) -> Opts = tls_opts(client, CompSpec), diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl index 4947ee6..31e51bd 100644 --- a/components/dlink_tls/src/dlink_tls_connmgr.erl +++ b/components/dlink_tls/src/dlink_tls_connmgr.erl @@ -35,11 +35,10 @@ -export([connections/0]). -define(SERVER, ?MODULE). +-define(PID_TAB, dlink_tls_pid_tab). +-define(ADDR_TAB, dlink_tls_addr_tab). --record(st, { - conn_by_pid = undefined, - conn_by_addr = undefined - }). +-record(st, {}). %%%=================================================================== %%% API @@ -71,8 +70,21 @@ connections() -> %% @end %%-------------------------------------------------------------------- start_link() -> + create_ets(), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +create_ets() -> + maybe_create(?PID_TAB), + maybe_create(?ADDR_TAB). + +maybe_create(Tab) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, [public, named_table, set]); + _ -> + Tab + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -89,10 +101,7 @@ start_link() -> %% @end %%-------------------------------------------------------------------- init([]) -> - {ok, #st{ - conn_by_pid = dict:new(), %% All managed connection stored by pid - conn_by_addr = dict:new() %% All managed connection stored by address - }}. + {ok, #st{}}. %%-------------------------------------------------------------------- %% @private @@ -108,106 +117,83 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call({add_connection, IP, Port, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({add_connection, IP, Port, Pid}, _From, St) -> + Addr = {IP, Port}, ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", - [ ?MODULE, Pid, { IP, Port }]), + [ ?MODULE, Pid, Addr]), %% Store so that we can find connection both by pid and by address - NConPid = dict:store(Pid, { IP, Port }, ConPid), - NConAddr = dict:store({ IP, Port }, Pid, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt}; + ets_insert(?PID_TAB, {Pid, Addr}), + ets_insert(?ADDR_TAB, {Addr, Pid}), + {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) when is_pid(Pid)-> - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid) -> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> + case ets_lookup(?PID_TAB, Pid) of + [] -> ?debug("~p:handle_call(del_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - {ok, Addr } -> + [{_, Addr}] -> ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", [ ?MODULE, Pid, Addr]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase(Addr, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Delete connection by address -handle_call({ delete_connection_by_address, IP, Port}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> %% Find Pid associated with Address - case dict:find({IP, Port}, ConAddr) of - error -> + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> ?debug("~p:handle_call(del_by_addr): not found: ~p", [ ?MODULE, {IP, Port}]), { reply, not_found, St}; - - {ok, Pid } -> + [{_, Pid}] -> ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", [ ?MODULE, Pid, {IP, Port}]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase({ IP, Port }, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Find connection by pid -handle_call({ find_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> - +handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(find_by_pid): not found: ~p", - [ ?MODULE, Pid]), + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(find_by_pid): not found: ~p~n~p", + [ ?MODULE, Pid, ets:tab2list(?PID_TAB)]), { reply, not_found, St}; - - {ok, {IP, Port} } -> + [{_, {IP, Port}}] -> ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", [ ?MODULE, Pid, {IP, Port}]), {reply, {ok, IP, Port}, St} end; %% Find connection by address -handle_call({find_connection_by_address, IP, Port}, _From, - #st { conn_by_addr = ConAddr} = St) -> - +handle_call({find_connection_by_address, IP, Port}, _From, St) -> %% Find address associated with Pid - case dict:find({IP, Port}, ConAddr) of - error -> + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> ?debug("~p:handle_call(find_by_addr): not found: ~p", - [ ?MODULE, {IP, Port}]), - + [ ?MODULE, Addr]), { reply, not_found, St}; - - {ok, Pid } -> + [{_, Pid}] -> ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", - [ ?MODULE, {IP, Port}, Pid]), + [ ?MODULE, Addr, Pid]), {reply, {ok, Pid}, St} end; -handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> - {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; +handle_call(connections, _From, St) -> + {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1'] }]), St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), @@ -270,3 +256,15 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +ets_lookup(Tab, Key) -> + ets:lookup(Tab, Key). + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl index 0effc66..7fdc4d2 100644 --- a/components/dlink_tls/src/dlink_tls_listener.erl +++ b/components/dlink_tls/src/dlink_tls_listener.erl @@ -14,29 +14,57 @@ -include_lib("lager/include/log.hrl"). -export([start_link/0, - add_listener/4, - remove_listener/3]). + add_listener/3, + remove_listener/2]). -export([init/2, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2, sock_opts/0, new_connection/4]). -behavior(gen_nb_server). +-define(TAB, dlink_tls_listener_tab). + start_link() -> - gen_nb_server:start_link(?MODULE, []). + create_tabs(), + gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []). -add_listener(Pid, IpAddr, Port, CompSpec) -> - gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}). +add_listener(IpAddr, Port, CompSpec) -> + gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}). -remove_listener(Pid, IpAddr, Port) -> - gen_server:call(Pid, {remove_listener, IpAddr, Port}). +remove_listener(IpAddr, Port) -> + gen_server:call(?MODULE, {remove_listener, IpAddr, Port}). init([], State) -> - {ok, State}. + State1 = + lists:foldl( + fun({{_,_}} = Addr, Acc) -> + case gen_nb_server:add_listen_socket(Addr, Acc) of + {ok, Acc1} -> + ets_insert(?TAB, {Addr}), + Acc1; + _Error -> + ets_delete(?TAB, Addr), + Acc + end; + ({cs, CS}, Acc) -> + ets_insert(?TAB, {cs, CS}), + gen_nb_server:store_cb_state(CS, Acc) + end, State, ets_select(?TAB, [{ '_', [], ['$_'] }])), + {ok, State1}. + +create_tabs() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [public, named_table, set]); + _ -> + ?TAB + end. handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> + ets_insert(?TAB, {cs, CompSpec}), case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_insert(?TAB, {{IpAddr, Port}}), {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; Error -> @@ -46,6 +74,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> handle_call({remove_listener, IpAddr, Port}, _From, State) -> case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_delete(?TAB, {IpAddr, Port}), {reply, ok, State1}; Error -> {reply, Error, State} @@ -79,5 +108,15 @@ new_connection(IP, Port, Sock, State) -> undefined, 0, Sock, dlink_tls_rpc, handle_socket, CompSpec), - dlink_tls_conn:upgrade(P, server, CompSpec), + dlink_tls_conn:async_upgrade(P, server, CompSpec), {ok, State}. + + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 2f47792..c0fb3a5 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -103,10 +103,10 @@ start_connection_manager() -> ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener - dlink_tls_connmgr:start_link(), - {ok,Pid} = dlink_tls_listener:start_link(), + %% dlink_tls_connmgr:start_link(), + %% {ok,Pid} = dlink_tls_listener:start_link(), - setup_initial_listeners(Pid, TlsOpts, CompSpec), + setup_initial_listeners(TlsOpts, CompSpec), ?info("dlink_tls:init_rvi_component(): Setting up persistent connections."), @@ -118,14 +118,14 @@ start_connection_manager() -> setup_persistent_connections_(PersistentConnections, CompSpec), ok. -setup_initial_listeners(Pid, [], CompSpec) -> +setup_initial_listeners([], CompSpec) -> ?debug("no initial listeners", []); -setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) -> +setup_initial_listeners([_|_] = TlsOpts, CompSpec) -> IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), %% Add listener port. ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), - case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of + case dlink_tls_listener:add_listener(IP, Port, CompSpec) of ok -> ?notice("---- RVI Node External Address: ~s", [ application:get_env(rvi_core, node_address, undefined)]); @@ -212,12 +212,18 @@ connect_remote(IP, Port, CompSpec) -> %% Setup a genserver around the new connection. {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, ?MODULE, handle_socket, CompSpec), - UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec), - ?debug("Upgrade result = ~p", [UgRes]), - %% Send authorize - send_authorize(Pid, CompSpec), - ok; - + try dlink_tls_conn:upgrade(Pid, client, CompSpec) of + ok -> + ?debug("Upgrade result = ~p", [ok]), + %% Send authorize + send_authorize(Pid, CompSpec), + ok + catch + error:Error -> + ?error("TLS upgrade (~p,~p) failed ~p", + [IP, Port, Error]), + not_available + end; {error, Err } -> ?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), diff --git a/components/dlink_tls/src/dlink_tls_sup.erl b/components/dlink_tls/src/dlink_tls_sup.erl index cd59434..2ede068 100644 --- a/components/dlink_tls/src/dlink_tls_sup.erl +++ b/components/dlink_tls/src/dlink_tls_sup.erl @@ -35,5 +35,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ + ?CHILD(dlink_tls_connmgr, worker), + ?CHILD(dlink_tls_listener, worker), ?CHILD(dlink_tls_rpc, worker) ]} }. diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl index 2b1f59c..e718e4d 100644 --- a/components/proto_msgpack/src/proto_msgpack_rpc.erl +++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl @@ -75,6 +75,7 @@ receive_message(CompSpec, {IP, Port}, Data) -> %% CAlled by local exo http server handle_rpc("send_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args), {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), @@ -89,7 +90,8 @@ handle_rpc("send_message", Args) -> ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters]}), + Parameters, + LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; @@ -99,10 +101,14 @@ handle_rpc(Other, _Args) -> handle_notification("receive_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Data} = rvi_common:get_json_element(["data"], Args), {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args), {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args), - gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}), + gen_server:cast(?SERVER, { rvi, receive_message, [Data, + RemoteIP, + RemotePort, + LogId]}), ok; handle_notification(Other, _Args) -> @@ -117,7 +123,8 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters]}, _From, St) -> + Parameters + | LogId]}, _From, St) -> ?debug(" protocol:send(): transaction id: ~p~n", [TID]), ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), @@ -139,7 +146,7 @@ handle_call(Other, _From, St) -> %% Convert list-based data to binary. -handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) -> +handle_cast({rvi, receive_message, [Payload, IP, Port | LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), {ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts), diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl index 962252c..067485c 100644 --- a/components/rvi_common/src/rvi_log.erl +++ b/components/rvi_common/src/rvi_log.erl @@ -357,7 +357,7 @@ purge_id() -> '$end_of_table' -> %% Should not be possible ... ok; - {Tid} -> + Tid -> ets:delete(?IDS, Tid), ets:match_delete(?EVENTS, #evt{id = {Tid,'_'}, _ = '_'}) end. diff --git a/components/service_edge/src/wse_server.erl b/components/service_edge/src/wse_server.erl index e9897a7..3ce1598 100644 --- a/components/service_edge/src/wse_server.erl +++ b/components/service_edge/src/wse_server.erl @@ -47,7 +47,7 @@ connection, %% 'Connection' key, %% "Sec-WebSocket-Key" protocol, %% "Sec-WebSocket-Protocol" - origin, %% + origin, %% version, %% "Sec-WebSocket-Version" cookie, %% 'Cookie' hs = [] @@ -77,10 +77,10 @@ %% but is included here to make the example self-contained -start(Port, M, F, A) when is_integer(Port) -> +start(Port, M, F, A) when is_integer(Port) -> start_([{cb, {M,F,A}}, {port,Port}]). -start(Port,M,F,A, Opts) when is_integer(Port) -> +start(Port,M,F,A, Opts) when is_integer(Port) -> start_([{port,Port}, {cb, {M,F,A}}] ++ Opts). start_(Opts) -> spawn(fun() -> init(Opts) end). @@ -88,7 +88,7 @@ start_(Opts) -> spawn(fun() -> init(Opts) end). stop(RegName) when is_atom(RegName) -> RegName ! stop. - + init(Opts) -> Port = proplists:get_value(port, Opts, ?WSE_DEFAULT_PORT), @@ -130,7 +130,7 @@ accept_loop(Listen,Opts,Pid) -> gen_tcp:close(Listen), exit(stopped) end. - + accept(Parent, Listen, Opts) -> ?debug("Accept ~p\n", [Listen]), case gen_tcp:accept(Listen) of @@ -146,7 +146,7 @@ accept(Parent, Listen, Opts) -> send(Pid, Data) -> - try + try Pid ! { send, Data }, ok catch @@ -157,7 +157,7 @@ send(Pid, Data) -> close(Pid) -> - try + try Pid ! close, ok catch @@ -284,7 +284,6 @@ ws_loop(Buf, Socket, S) -> receive %% WebSocket stuff {tcp, Socket, Data} -> - ?debug("tcp ~w: ~p", [Socket, Data]), ws_data(Buf, Data, Socket, S); {tcp_closed, Socket} -> @@ -293,14 +292,14 @@ ws_loop(Buf, Socket, S) -> {'EXIT',Pid,Reason} -> case get(parent) of - Pid -> + Pid -> ?debug("exit from parent ~w reason=~p\n", [Pid, Reason]), exit(Reason); _ -> ?debug("exit from ~w reason=~p\n", [Pid, Reason]), ws_loop(Buf, Socket, S) end; - + Message -> ?debug("handle_local: ~p - ~p", [Message, S]), case handle_local(Message, Socket, S) of @@ -313,23 +312,20 @@ ws_loop(Buf, Socket, S) -> end end. - + ws_data(Buf, Data, Socket, S) -> case <<Buf/binary, Data/binary>> of %% masked data <<Fin:1,_Rsv:3,Op:4,1:1,126:7,L:16,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket, Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); <<Fin:1,_Rsv:3,Op:4,1:1,127:7,L:64,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket,Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); <<Fin:1,_Rsv:3,Op:4,1:1,L:7,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket,Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); @@ -364,9 +360,7 @@ ws_mask(<<M:32>>, Frag) -> ws_fragment(Socket,1, Op, Frag, S) -> Payload = iolist_to_binary(lists:reverse([Frag|S#s.fs])), - ?debug("op=~w, unmasked payload = ~p", [ws_opcode(Op),Payload]), Message = ws_decode(Payload,Op), - ?debug("handle_remote: ~p", [Message]), handle_remote(Message, Socket, S#s { fs=[] }); ws_fragment(_Socket, 0, _Op, Frag, S) -> @@ -405,7 +399,7 @@ ws_make_frame(Fin, Op, Mask, Data) -> <<Fin:1,0:3,Op:4,M:1,127:7,L:64,Mask/binary,Data/binary>> end. - + handle_local({ send,Data},Socket,S0) -> ?debug("wse_server:send(): ~p", [ Data]), gen_tcp:send(Socket, ws_make_server_frame(Data, S0#s.type)), @@ -492,7 +486,7 @@ stop_pong_timer(S0) -> if is_reference(Tmr) -> erlang:cancel_timer(Tmr), receive - {timeout,Tmr,pong} -> + {timeout,Tmr,pong} -> ok after 0 -> ok |