diff options
Diffstat (limited to 'lib/ssl/test/inet_crypto_dist.erl')
-rw-r--r-- | lib/ssl/test/inet_crypto_dist.erl | 694 |
1 files changed, 422 insertions, 272 deletions
diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl index 58a303f48c..902bb5c252 100644 --- a/lib/ssl/test/inet_crypto_dist.erl +++ b/lib/ssl/test/inet_crypto_dist.erl @@ -29,7 +29,7 @@ -define(DRIVER, inet_tcp). -define(FAMILY, inet). --export([listen/1, accept/1, accept_connection/5, +-export([supported/0, listen/1, accept/1, accept_connection/5, setup/5, close/1, select/1, is_node_name/1]). %% Generalized dist API, for sibling IPv6 module inet6_crypto_dist @@ -51,11 +51,23 @@ %% ------------------------------------------------------------------------- +%% The curve choice greatly affects setup time, +%% we really want an Edwards curve but that would +%% require a very new openssl version. +%% Twisted brainpool curves (*t1) are faster than +%% non-twisted (*r1), 256 is much faster than 384, +%% and so on... +%%% -define(CURVE, brainpoolP384t1). +%%% -define(CURVE, brainpoolP256t1). +-define(CURVE, secp256r1). +-define(CIPHER, aes_gcm). +-define(HMAC, sha256). + -record(params, {socket, dist_handle, - hmac_algorithm = sha256, - aead_cipher = aes_gcm, + hmac_algorithm = ?HMAC, + aead_cipher = ?CIPHER, rekey_key, iv = 12, key = 16, @@ -71,20 +83,29 @@ params(Socket) -> -record(key_pair, {type = ecdh, - %% The curve choice greatly affects setup time, - %% we really want an Edwards curve but that would - %% require a very new openssl version. - %% Twisted brainpool curves (*t1) are faster than - %% non-twisted (*r1), 256 is much faster than 384, - %% and so on... -%%% params = brainpoolP384t1, - params = brainpoolP256t1, + params = ?CURVE, public, private, life_time = 3600000, % 1 hour life_count = 256 % Number of connection setups }). +supported() -> + Curve = lists:member(?CURVE, crypto:supports(curves)), + Cipher = lists:member(?CIPHER, crypto:supports(ciphers)), + Hmac = + lists:member(hmac, crypto:supports(macs)) andalso + lists:member(?HMAC, crypto:supports(hashs)), + if + not Curve -> + "curve " ++ atom_to_list(?CURVE); + not Cipher -> + "cipher " ++ atom_to_list(?CIPHER); + not Hmac -> + "HMAC " ++ atom_to_list(?HMAC); + true -> + ok + end. %% ------------------------------------------------------------------------- %% Keep the node's public/private key pair in the process state @@ -95,6 +116,7 @@ params(Socket) -> start_key_pair_server() -> monitor_dist_proc( + key_pair_server, spawn_link( fun () -> register(?MODULE, self()), @@ -354,6 +376,7 @@ gen_accept(Listen, Driver) -> %% Spawn Acceptor process %% monitor_dist_proc( + acceptor, spawn_opt( fun () -> start_key_pair_server(), @@ -424,6 +447,7 @@ gen_accept_connection( %% Spawn Controller/handshaker/ticker process %% monitor_dist_proc( + accept_controller, spawn_opt( fun() -> do_accept( @@ -466,6 +490,7 @@ gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, Driver) -> %% Spawn Controller/handshaker/ticker process %% monitor_dist_proc( + setup_controller, spawn_opt( setup_fun( Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel), @@ -796,6 +821,7 @@ start_dist_ctrl(Socket, Timeout) -> Controller = self(), Server = monitor_dist_proc( + output_handler, spawn_opt( fun () -> receive @@ -937,7 +963,12 @@ init_recv( RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}} end catch - error : Reason : Stacktrace-> + Class : Reason : Stacktrace when Class =:= error -> + error_logger:info_report( + [init_recv_exception, + {class, Class}, + {reason, Reason}, + {stacktrace, Stacktrace}]), _ = trace({Reason, Stacktrace}), exit(connection_closed) end. @@ -960,8 +991,8 @@ init_msg( MsgLen = byte_size(R1A) + TagLen + iolist_size(Plaintext), AAD = [<<MsgLen:32>>, R1A], {Ciphertext, Tag} = - crypto:crypto_one_time(AeadCipher, Key1A, IV1A, - {AAD, Plaintext, TagLen}, true), + crypto:crypto_one_time_aead( + AeadCipher, Key1A, IV1A, Plaintext, AAD, TagLen, true), Msg = [R1A, Tag, Ciphertext], {R2A, R3A, Msg}. %% @@ -982,8 +1013,8 @@ init_msg( MsgLen = byte_size(Msg), AAD = [<<MsgLen:32>>, R1B], case - crypto:crypto_one_time( - AeadCipher, Key1B, IV1B, {AAD, Ciphertext, Tag}, false) + crypto:crypto_one_time_aead( + AeadCipher, Key1B, IV1B, Ciphertext, AAD, Tag, false) of <<R2B:RLen/binary, R3B:RLen/binary, PubKeyB/binary>> -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), @@ -1000,8 +1031,9 @@ init_msg( StartMsgLen = TagLen + iolist_size(StartCleartext), StartAAD = <<StartMsgLen:32>>, {StartCiphertext, StartTag} = - crypto:crypto_one_time(AeadCipher, Key2A, IV2A, - {StartAAD, StartCleartext, TagLen}, true), + crypto:crypto_one_time_aead( + AeadCipher, Key2A, IV2A, + StartCleartext, StartAAD, TagLen, true), StartMsg = [StartTag, StartCiphertext], %% {Key2B, IV2B} = @@ -1032,8 +1064,8 @@ start_msg( MsgLen = byte_size(Msg), AAD = <<MsgLen:32>>, case - crypto:crypto_one_time( - AeadCipher, Key2B, IV2B, {AAD, Ciphertext, Tag}, false) + crypto:crypto_one_time_aead( + AeadCipher, Key2B, IV2B, Ciphertext, AAD, Tag, false) of <<R2A:RLen/binary, R3A:RLen/binary, RekeyCountB:32>> when RekeyCountA =< (RekeyCountB bsl 2), @@ -1044,7 +1076,7 @@ start_msg( hmac_key_iv(HmacAlgo, MacKey, Data, KeyLen, IVLen) -> <<Key:KeyLen/binary, IV:IVLen/binary>> = - crypto:macN(HmacAlgo, MacKey, Data, KeyLen + IVLen), + crypto:macN(hmac, HmacAlgo, MacKey, Data, KeyLen + IVLen), {Key, IV}. %% ------------------------------------------------------------------------- @@ -1063,20 +1095,14 @@ handshake( {?MODULE, From, {handshake_complete, DistHandle}} -> InputHandler = monitor_dist_proc( + input_handler, spawn_opt( fun () -> link(Controller), receive DistHandle -> - ok = - inet:setopts( - Socket, - [{active, ?TCP_ACTIVE}, - nodelay()]), input_handler( - RecvParams#params{ - dist_handle = DistHandle}, - RecvSeq, empty_q(), infinity) + RecvParams, RecvSeq, DistHandle) end end, [link, @@ -1085,42 +1111,40 @@ handshake( {fullsweep_after, 0}])), _ = monitor(process, InputHandler), % For the benchmark test ok = gen_tcp:controlling_process(Socket, InputHandler), + false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true), ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler), InputHandler ! DistHandle, - crypto:rand_seed_alg(crypto_cache), reply(From, ok), process_flag(priority, normal), - erlang:dist_ctrl_get_data_notification(DistHandle), - output_handler( - SendParams#params{ - dist_handle = DistHandle, - rekey_msg = start_rekey_timer(SendParams#params.rekey_time)}, - SendSeq); + output_handler(SendParams, SendSeq, DistHandle); %% {?MODULE, From, {send, Data}} -> - case + {SendParams_1, SendSeq_1, Result} = encrypt_and_send_chunk( - SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data]) - of - {SendParams_1, SendSeq_1, ok} -> + SendParams, SendSeq, + [?HANDSHAKE_CHUNK, Data], 1 + iolist_size(Data)), + if + Result =:= ok -> reply(From, ok), handshake( SendParams_1, SendSeq_1, RecvParams, RecvSeq, Controller); - {_, _, Error} -> + true -> reply(From, {error, closed}), - death_row({send, trace(Error)}) + death_row({send, trace(Result)}) end; {?MODULE, From, recv} -> - case recv_and_decrypt_chunk(RecvParams, RecvSeq) of - {RecvParams_1, RecvSeq_1, {ok, _} = Reply} -> - reply(From, Reply), + {RecvParams_1, RecvSeq_1, Result} = + recv_and_decrypt_chunk(RecvParams, RecvSeq), + case Result of + {ok, _} -> + reply(From, Result), handshake( SendParams, SendSeq, RecvParams_1, RecvSeq_1, Controller); - {_, _, Error} -> - reply(From, Error), - death_row({recv, trace(Error)}) + {error, _} -> + reply(From, Result), + death_row({recv, trace(Result)}) end; {?MODULE, From, peername} -> reply(From, inet:peername(Socket)), @@ -1136,11 +1160,17 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) -> case decrypt_chunk(RecvParams, RecvSeq, Chunk) of <<?HANDSHAKE_CHUNK, Cleartext/binary>> -> {RecvParams, RecvSeq + 1, {ok, Cleartext}}; - OtherChunk when is_binary(OtherChunk) -> - {RecvParams, RecvSeq + 1, {error, decrypt_error}}; + UnknownChunk when is_binary(UnknownChunk) -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason,unknown_chunk}]), + {RecvParams, RecvSeq + 1, {error, unknown_chunk}}; #params{} = RecvParams_1 -> recv_and_decrypt_chunk(RecvParams_1, 0); error -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason,decrypt_error}]), {RecvParams, RecvSeq, {error, decrypt_error}} end; Error -> @@ -1150,9 +1180,38 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) -> %% ------------------------------------------------------------------------- %% Output handler process %% -%% The game here is to flush all dist_data and dist_tick messages, -%% prioritize dist_data over dist_tick, and to not use selective receive +%% Await an event about what to do; fetch dist data from the VM, +%% send a dist tick, or rekey outbound encryption parameters. +%% +%% In case we are overloaded and could get many accumulated +%% dist_data or dist_tick messages; make sure to flush all of them +%% before proceeding with what to do. But, do not use selective +%% receive since that does not perform well when there are +%% many messages in the process mailbox. + +%% Entry function +output_handler(Params, Seq, DistHandle) -> + try + _ = crypto:rand_seed_alg(crypto_cache), + erlang:dist_ctrl_get_data_notification(DistHandle), + output_handler( + Params#params{ + dist_handle = DistHandle, + rekey_msg = start_rekey_timer(Params#params.rekey_time)}, + Seq) + catch + Class : Reason : Stacktrace -> + error_logger:info_report( + [output_handler_exception, + {class, Class}, + {reason, Reason}, + {stacktrace, Stacktrace}]), + erlang:raise(Class, Reason, Stacktrace) + end. +%% Loop top +%% +%% State: lurking until any interesting message output_handler(Params, Seq) -> receive Msg -> @@ -1162,8 +1221,7 @@ output_handler(Params, Seq) -> dist_tick -> output_handler_tick(Params, Seq); _ when Msg =:= Params#params.rekey_msg -> - Params_1 = output_handler_rekey(Params, Seq), - output_handler(Params_1, 0); + output_handler_rekey(Params, Seq); _ -> %% Ignore _ = trace(Msg), @@ -1171,6 +1229,7 @@ output_handler(Params, Seq) -> end end. +%% State: we have received at least one dist_data message output_handler_data(Params, Seq) -> receive Msg -> @@ -1180,21 +1239,19 @@ output_handler_data(Params, Seq) -> dist_tick -> output_handler_data(Params, Seq); _ when Msg =:= Params#params.rekey_msg -> - Params_1 = output_handler_rekey(Params, Seq), - output_handler_data(Params_1, 0); + output_handler_rekey(Params, Seq); _ -> %% Ignore _ = trace(Msg), output_handler_data(Params, Seq) end after 0 -> - DistHandle = Params#params.dist_handle, - Q = get_data(DistHandle, empty_q()), - {Params_1, Seq_1} = output_handler_send(Params, Seq, Q), - erlang:dist_ctrl_get_data_notification(DistHandle), + {Params_1, Seq_1} = output_handler_xfer(Params, Seq), + erlang:dist_ctrl_get_data_notification(Params#params.dist_handle), output_handler(Params_1, Seq_1) end. +%% State: we have received at least one dist_tick but no dist_data message output_handler_tick(Params, Seq) -> receive Msg -> @@ -1204,8 +1261,7 @@ output_handler_tick(Params, Seq) -> dist_tick -> output_handler_tick(Params, Seq); _ when Msg =:= Params#params.rekey_msg -> - Params_1 = output_handler_rekey(Params, Seq), - output_handler(Params_1, 0); + output_handler_rekey(Params, Seq); _ -> %% Ignore _ = trace(Msg), @@ -1214,185 +1270,273 @@ output_handler_tick(Params, Seq) -> after 0 -> TickSize = 7 + rand:uniform(56), TickData = binary:copy(<<0>>, TickSize), - case - encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData]) - of - {Params_1, Seq_1, ok} -> + {Params_1, Seq_1, Result} = + encrypt_and_send_chunk( + Params, Seq, [?TICK_CHUNK, TickData], 1 + TickSize), + if + Result =:= ok -> output_handler(Params_1, Seq_1); - {_, _, Error} -> - _ = trace(Error), - death_row() + true -> + death_row({send_tick, trace(Result)}) end end. output_handler_rekey(Params, Seq) -> case encrypt_and_send_rekey_chunk(Params, Seq) of #params{} = Params_1 -> - Params_1; + output_handler(Params_1, 0); SendError -> - _ = trace(SendError), - death_row() + death_row({send_rekey, trace(SendError)}) end. -output_handler_send(Params, Seq, {_, Size, _} = Q) -> - if - ?CHUNK_SIZE < Size -> - output_handler_deq_send(Params, Seq, Q, ?CHUNK_SIZE); - true -> - case get_data(Params#params.dist_handle, Q) of - {_, 0, _} -> + +%% Get outbound data from VM; encrypt and send, +%% until the VM has no more +%% +output_handler_xfer(Params, Seq) -> + output_handler_xfer(Params, Seq, [], 0, []). +%% +%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size +%% +output_handler_xfer(Params, Seq, Front, Size, Rear) + when ?CHUNK_SIZE =< Size -> + %% + %% We have a full chunk or more + %% -> collect one chunk or less and send + output_handler_collect(Params, Seq, Front, Size, Rear); +output_handler_xfer(Params, Seq, Front, Size, Rear) -> + %% when Size < ?CHUNK_SIZE -> + %% + %% We do not have a full chunk -> try to fetch more from VM + case erlang:dist_ctrl_get_data(Params#params.dist_handle) of + none -> + if + Size =:= 0 -> + %% No more data from VM, nothing buffered + %% -> go back to lurking {Params, Seq}; - {_, Size, _} = Q_1 -> % Got no more - output_handler_deq_send(Params, Seq, Q_1, Size); - Q_1 -> - output_handler_send(Params, Seq, Q_1) - end + true -> + %% The VM had no more -> send what we have + output_handler_collect(Params, Seq, Front, Size, Rear) + end; + {Len,Iov} -> + output_handler_enq( + Params, Seq, Front, Size + 4 + Len, [<<Len:32>>|Rear], Iov) end. -output_handler_deq_send(Params, Seq, Q, Size) -> - {Cleartext, Q_1} = deq_iovec(Size, Q), - case - encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext]) - of - {Params_1, Seq_1, ok} -> - output_handler_send(Params_1, Seq_1, Q_1); - {_, _, Error} -> - _ = trace(Error), - death_row() +%% Enqueue VM data while splitting large binaries into ?CHUNK_SIZE +%% +output_handler_enq(Params, Seq, Front, Size, Rear, []) -> + output_handler_xfer(Params, Seq, Front, Size, Rear); +output_handler_enq(Params, Seq, Front, Size, Rear, [Bin|Iov]) -> + output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin). +%% +output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin) -> + BinSize = byte_size(Bin), + if + BinSize =< ?CHUNK_SIZE -> + output_handler_enq( + Params, Seq, Front, Size, [Bin|Rear], Iov); + true -> + <<Bin1:?CHUNK_SIZE/binary, Bin2/binary>> = Bin, + output_handler_enq( + Params, Seq, Front, Size, [Bin1|Rear], Iov, Bin2) end. -%% ------------------------------------------------------------------------- -%% Input handler process +%% Collect small binaries into chunks of at most ?CHUNK_SIZE %% -%% Here is T = 0|infinity to steer if we should try to receive -%% more data or not; start with infinity, and when we get some -%% data try with 0 to see if more is waiting - -input_handler(#params{socket = Socket} = Params, Seq, Q, T) -> - receive - Msg -> - case Msg of - {tcp_passive, Socket} -> - ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]), - Q_1 = - case T of - 0 -> - deliver_data(Params#params.dist_handle, Q); - infinity -> - Q - end, - input_handler(Params, Seq, Q_1, infinity); - {tcp, Socket, Chunk} -> - input_chunk(Params, Seq, Q, T, Chunk); - {tcp_closed, Socket} -> - exit(connection_closed); - Other -> - %% Ignore... - _ = trace(Other), - input_handler(Params, Seq, Q, T) - end - after T -> - Q_1 = deliver_data(Params#params.dist_handle, Q), - input_handler(Params, Seq, Q_1, infinity) +output_handler_collect(Params, Seq, [], Zero, []) -> + 0 = Zero, % Assert + %% No more enqueued -> try to get more form VM + output_handler_xfer(Params, Seq); +output_handler_collect(Params, Seq, Front, Size, Rear) -> + output_handler_collect(Params, Seq, Front, Size, Rear, [], 0). +%% +output_handler_collect(Params, Seq, [], Zero, [], Acc, DataSize) -> + 0 = Zero, % Assert + output_handler_chunk(Params, Seq, [], Zero, [], Acc, DataSize); +output_handler_collect(Params, Seq, [], Size, Rear, Acc, DataSize) -> + %% Okasaki queue transfer Rear -> Front + output_handler_collect( + Params, Seq, lists:reverse(Rear), Size, [], Acc, DataSize); +output_handler_collect( + Params, Seq, [Bin|Iov] = Front, Size, Rear, Acc, DataSize) -> + BinSize = byte_size(Bin), + DataSize_1 = DataSize + BinSize, + if + ?CHUNK_SIZE < DataSize_1 -> + %% Bin does not fit in chunk -> send Acc + output_handler_chunk( + Params, Seq, Front, Size, Rear, Acc, DataSize); + DataSize_1 < ?CHUNK_SIZE -> + %% Chunk not full yet -> try to accumulate more + output_handler_collect( + Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1); + true -> % DataSize_1 == ?CHUNK_SIZE -> + %% Optimize one iteration; Bin fits exactly -> accumulate and send + output_handler_chunk( + Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1) end. -input_chunk(Params, Seq, Q, T, Chunk) -> - case decrypt_chunk(Params, Seq, Chunk) of - <<?DATA_CHUNK, Cleartext/binary>> -> - input_handler(Params, Seq + 1, enq_binary(Cleartext, Q), 0); - <<?TICK_CHUNK, _/binary>> -> - input_handler(Params, Seq + 1, Q, T); - OtherChunk when is_binary(OtherChunk) -> - _ = trace(invalid_chunk), - exit(connection_closed); - #params{} = Params_1 -> - input_handler(Params_1, 0, Q, T); - error -> - _ = trace(decrypt_error), - exit(connection_closed) +%% Encrypt and send a chunk +%% +output_handler_chunk(Params, Seq, Front, Size, Rear, Acc, DataSize) -> + Data = lists:reverse(Acc), + {Params_1, Seq_1, Result} = + encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK|Data], 1 + DataSize), + if + Result =:= ok -> + %% Try to collect another chunk + output_handler_collect(Params_1, Seq_1, Front, Size, Rear); + true -> + death_row({send_chunk, trace(Result)}) end. %% ------------------------------------------------------------------------- -%% erlang:dist_ctrl_* helpers +%% Input handler process +%% -%% Get data for sending from the VM and place it in a queue +%% Entry function +input_handler(#params{socket = Socket} = Params, Seq, DistHandle) -> + try + ok = + inet:setopts( + Socket, [{active, ?TCP_ACTIVE}, nodelay()]), + input_handler( + Params#params{dist_handle = DistHandle}, + Seq) + catch + Class : Reason : Stacktrace -> + error_logger:info_report( + [input_handler_exception, + {class, Class}, + {reason, Reason}, + {stacktrace, Stacktrace}]), + erlang:raise(Class, Reason, Stacktrace) + end. + +%% Loop top +input_handler(Params, Seq) -> + %% Shortcut into the loop + {Params_1, Seq_1, Data} = input_data(Params, Seq), + input_handler(Params_1, Seq_1, Data, [], byte_size(Data)). %% -get_data(DistHandle, {Front, Size, Rear}) -> - get_data(DistHandle, Front, Size, Rear). +input_handler(Params, Seq, First, Buffer, Size) -> + %% Size is size of First + Buffer + case First of + <<Packet1Size:32, Packet1:Packet1Size/binary, + Packet2Size:32, Packet2:Packet2Size/binary, Rest/binary>> -> + DistHandle = Params#params.dist_handle, + erlang:dist_ctrl_put_data(DistHandle, Packet1), + erlang:dist_ctrl_put_data(DistHandle, Packet2), + input_handler( + Params, Seq, Rest, + Buffer, Size - (8 + Packet1Size + Packet2Size)); + <<PacketSize:32, Packet:PacketSize/binary, Rest/binary>> -> + DistHandle = Params#params.dist_handle, + erlang:dist_ctrl_put_data(DistHandle, Packet), + input_handler( + Params, Seq, Rest, Buffer, Size - (4 + PacketSize)); + <<PacketSize:32, PacketStart/binary>> -> + %% Partial packet in First + input_handler( + Params, Seq, PacketStart, Buffer, Size - 4, PacketSize); + <<Bin/binary>> -> + %% Partial header in First + if + 4 =< Size -> + %% Complete header in First + Buffer + {First_1, Buffer_1, PacketSize} = + input_get_packet_size(Bin, lists:reverse(Buffer)), + input_handler( + Params, Seq, First_1, Buffer_1, Size - 4, PacketSize); + true -> + %% Incomplete header received so far + {Params_1, Seq_1, More} = input_data(Params, Seq), + input_handler( + Params_1, Seq_1, Bin, + [More|Buffer], Size + byte_size(More)) + end + end. %% -get_data(DistHandle, Front, Size, Rear) -> - case erlang:dist_ctrl_get_data(DistHandle) of - none -> - {Front, Size, Rear}; - Bin when is_binary(Bin) -> - Len = byte_size(Bin), - get_data( - DistHandle, Front, Size + 4 + Len, - [Bin, <<Len:32>>|Rear]); - [Bin1, Bin2] -> - Len = byte_size(Bin1) + byte_size(Bin2), - get_data( - DistHandle, Front, Size + 4 + Len, - [Bin2, Bin1, <<Len:32>>|Rear]); - Iovec -> - Len = iolist_size(Iovec), - get_data( - DistHandle, Front, Size + 4 + Len, - lists:reverse(Iovec, [<<Len:32>>|Rear])) +input_handler(Params, Seq, PacketStart, Buffer, Size, PacketSize) -> + %% Size is size of PacketStart + Buffer + RestSize = Size - PacketSize, + if + RestSize < 0 -> + %% Incomplete packet received so far + {Params_1, Seq_1, More} = input_data(Params, Seq), + input_handler( + Params_1, Seq_1, PacketStart, + [More|Buffer], Size + byte_size(More), PacketSize); + 0 < RestSize, Buffer =:= [] -> + %% Rest data in PacketStart + <<Packet:PacketSize/binary, Rest/binary>> = PacketStart, + DistHandle = Params#params.dist_handle, + erlang:dist_ctrl_put_data(DistHandle, Packet), + input_handler(Params, Seq, Rest, [], RestSize); + Buffer =:= [] -> % RestSize == 0 + %% No rest data + DistHandle = Params#params.dist_handle, + erlang:dist_ctrl_put_data(DistHandle, PacketStart), + input_handler(Params, Seq); + true -> + %% Split packet from rest data + LastBin = hd(Buffer), + <<PacketLast:(byte_size(LastBin) - RestSize)/binary, + Rest/binary>> = LastBin, + Packet = [PacketStart|lists:reverse(tl(Buffer), PacketLast)], + DistHandle = Params#params.dist_handle, + erlang:dist_ctrl_put_data(DistHandle, Packet), + input_handler(Params, Seq, Rest, [], RestSize) end. -%% De-packet and deliver received data to the VM from a queue -%% -deliver_data(DistHandle, Q) -> - case Q of - {[], Size, []} -> - Size = 0, % Assert - Q; - {[], Size, Rear} -> - [Bin|Front] = lists:reverse(Rear), - deliver_data(DistHandle, Front, Size, [], Bin); - {[Bin|Front], Size, Rear} -> - deliver_data(DistHandle, Front, Size, Rear, Bin) +input_get_packet_size(First, [Bin|Buffer]) -> + MissingSize = 4 - byte_size(First), + if + MissingSize =< byte_size(Bin) -> + <<Last:MissingSize/binary, Rest/binary>> = Bin, + <<PacketSize:32>> = <<First/binary, Last/binary>>, + {Rest, lists:reverse(Buffer), PacketSize}; + true -> + input_get_packet_size(<<First/binary, Bin/binary>>, Buffer) end. + +input_data(Params, Seq) -> + receive Msg -> input_data(Params, Seq, Msg) end. %% -deliver_data(DistHandle, Front, Size, Rear, Bin) -> - case Bin of - <<DataSizeA:32, DataA:DataSizeA/binary, - DataSizeB:32, DataB:DataSizeB/binary, Rest/binary>> -> - erlang:dist_ctrl_put_data(DistHandle, DataA), - erlang:dist_ctrl_put_data(DistHandle, DataB), - deliver_data( - DistHandle, - Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear, - Rest); - <<DataSize:32, Data:DataSize/binary, Rest/binary>> -> - erlang:dist_ctrl_put_data(DistHandle, Data), - deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest); - <<DataSize:32, FirstData/binary>> -> - TotalSize = 4 + DataSize, - if - TotalSize =< Size -> - BinSize = byte_size(Bin), - {MoreData, Q} = - deq_iovec( - TotalSize - BinSize, - Front, Size - BinSize, Rear), - erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]), - deliver_data(DistHandle, Q); - true -> % Incomplete data - {[Bin|Front], Size, Rear} +input_data(#params{socket = Socket} = Params, Seq, Msg) -> + case Msg of + {tcp_passive, Socket} -> + ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]), + input_data(Params, Seq); + {tcp, Socket, Ciphertext} -> + case decrypt_chunk(Params, Seq, Ciphertext) of + <<?DATA_CHUNK, Chunk/binary>> -> + {Params, Seq + 1, Chunk}; + <<?TICK_CHUNK, _Dummy/binary>> -> + input_data(Params, Seq + 1); + <<UnknownChunk/binary>> -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason, unknown_chunk}]), + _ = trace(UnknownChunk), + exit(connection_closed); + #params{} = Params_1 -> + input_data(Params_1, 0); + error -> + _ = trace(decrypt_error), + exit(connection_closed) end; - <<_/binary>> -> - BinSize = byte_size(Bin), - if - 4 =< Size -> % Fragmented header - extract a header bin - {RestHeader, {Front_1, _Size_1, Rear_1}} = - deq_iovec(4 - BinSize, Front, Size - BinSize, Rear), - Header = iolist_to_binary([Bin|RestHeader]), - deliver_data(DistHandle, Front_1, Size, Rear_1, Header); - true -> % Incomplete header - {[Bin|Front], Size, Rear} - end + {tcp_closed = Reason, Socket} -> + error_logger:info_report( + [?FUNCTION_NAME, + {reason, Reason}]), + exit(connection_closed); + Other -> + %% Ignore... + _ = trace(Other), + input_data(Params, Seq) end. %% ------------------------------------------------------------------------- @@ -1400,20 +1544,23 @@ deliver_data(DistHandle, Front, Size, Rear, Bin) -> encrypt_and_send_chunk( #params{ - socket = Socket, rekey_count = Seq, rekey_msg = RekeyMsg} = Params, - Seq, Cleartext) -> + socket = Socket, rekey_count = RekeyCount, rekey_msg = RekeyMsg} = Params, + Seq, Cleartext, Size) when Seq =:= RekeyCount -> %% cancel_rekey_timer(RekeyMsg), case encrypt_and_send_rekey_chunk(Params, Seq) of #params{} = Params_1 -> Result = - gen_tcp:send(Socket, encrypt_chunk(Params, 0, Cleartext)), + gen_tcp:send( + Socket, encrypt_chunk(Params, 0, Cleartext, Size)), {Params_1, 1, Result}; SendError -> {Params, Seq + 1, SendError} end; -encrypt_and_send_chunk(#params{socket = Socket} = Params, Seq, Cleartext) -> - Result = gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext)), +encrypt_and_send_chunk( + #params{socket = Socket} = Params, Seq, Cleartext, Size) -> + Result = + gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext, Size)), {Params, Seq + 1, Result}. encrypt_and_send_rekey_chunk( @@ -1430,7 +1577,9 @@ encrypt_and_send_rekey_chunk( #key_pair{public = PubKeyA} = KeyPair = get_new_key_pair(), case gen_tcp:send( - Socket, encrypt_chunk(Params, Seq, [?REKEY_CHUNK, PubKeyA])) + Socket, + encrypt_chunk( + Params, Seq, [?REKEY_CHUNK, PubKeyA], 1 + byte_size(PubKeyA))) of ok -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), @@ -1445,18 +1594,19 @@ encrypt_and_send_rekey_chunk( SendError -> SendError end. - + encrypt_chunk( #params{ aead_cipher = AeadCipher, - iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, Seq, Cleartext) -> + iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, + Seq, Cleartext, Size) -> %% - ChunkLen = iolist_size(Cleartext) + TagLen, + ChunkLen = Size + TagLen, AAD = <<Seq:32, ChunkLen:32>>, IVBin = <<IVSalt/binary, (IVNo + Seq):48>>, {Ciphertext, CipherTag} = - crypto:crypto_one_time(AeadCipher, Key, IVBin, - {AAD, Cleartext, TagLen}, true), + crypto:crypto_one_time_aead( + AeadCipher, Key, IVBin, Cleartext, AAD, TagLen, true), Chunk = [Ciphertext,CipherTag], Chunk. @@ -1468,31 +1618,33 @@ decrypt_chunk( ChunkLen = byte_size(Chunk), if ChunkLen < TagLen -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason,short_chunk}]), error; true -> AAD = <<Seq:32, ChunkLen:32>>, IVBin = <<IVSalt/binary, (IVNo + Seq):48>>, CiphertextLen = ChunkLen - TagLen, - case Chunk of - <<Ciphertext:CiphertextLen/binary, - CipherTag:TagLen/binary>> -> - block_decrypt( - Params, Seq, AeadCipher, Key, IVBin, - {AAD, Ciphertext, CipherTag}); - _ -> - error - end + <<Ciphertext:CiphertextLen/binary, + CipherTag:TagLen/binary>> = Chunk, + block_decrypt( + Params, Seq, AeadCipher, Key, IVBin, + Ciphertext, AAD, CipherTag) end. block_decrypt( #params{ rekey_key = #key_pair{public = PubKeyA} = KeyPair, rekey_count = RekeyCount} = Params, - Seq, AeadCipher, Key, IV, Data) -> - case crypto:crypto_one_time(AeadCipher, Key, IV, Data, false) of - <<?REKEY_CHUNK, Rest/binary>> -> + Seq, AeadCipher, Key, IV, Ciphertext, AAD, CipherTag) -> + case + crypto:crypto_one_time_aead( + AeadCipher, Key, IV, Ciphertext, AAD, CipherTag, false) + of + <<?REKEY_CHUNK, Chunk/binary>> -> PubKeyLen = byte_size(PubKeyA), - case Rest of + case Chunk of <<PubKeyB:PubKeyLen/binary>> -> SharedSecret = compute_shared_secret(KeyPair, PubKeyB), KeyLen = byte_size(Key), @@ -1504,56 +1656,50 @@ block_decrypt( SharedSecret, [Key, IV], KeyLen, IVLen), Params#params{iv = {IVSalt, IVNo}, key = Key_1}; _ -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason,bad_rekey_chunk}]), error end; Chunk when is_binary(Chunk) -> case Seq of RekeyCount -> %% This was one chunk too many without rekeying + error_logger:error_report( + [?FUNCTION_NAME, + {reason,rekey_overdue}]), error; _ -> Chunk end; error -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason,decrypt_error}]), error end. %% ------------------------------------------------------------------------- -%% Queue of binaries i.e an iovec queue - -empty_q() -> - {[], 0, []}. -enq_binary(Bin, {Front, Size, Rear}) -> - {Front, Size + byte_size(Bin), [Bin|Rear]}. +%% Wait for getting killed by process link, +%% and if that does not happen - drop dead -deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size -> - deq_iovec(GetSize, Front, Size, Rear, []). -%% -deq_iovec(GetSize, Front, Size, Rear) -> - deq_iovec(GetSize, Front, Size, Rear, []). -%% -deq_iovec(GetSize, [], Size, Rear, Acc) -> - deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc); -deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) -> - BinSize = byte_size(Bin), - if - BinSize < GetSize -> - deq_iovec( - GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]); - GetSize < BinSize -> - {Bin1,Bin2} = erlang:split_binary(Bin, GetSize), - {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}}; - true -> - {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}} +death_row(Reason) -> + error_logger:info_report( + [?FUNCTION_NAME, + {reason, Reason}, + {pid, self()}]), + receive + after 5000 -> + death_row_timeout(Reason) end. -%% ------------------------------------------------------------------------- - -death_row() -> death_row(connection_closed). -%% -death_row(normal) -> death_row(connection_closed); -death_row(Reason) -> receive after 5000 -> exit(Reason) end. +death_row_timeout(Reason) -> + error_logger:error_report( + [?FUNCTION_NAME, + {reason, Reason}, + {pid, self()}]), + exit(Reason). %% ------------------------------------------------------------------------- @@ -1561,23 +1707,27 @@ death_row(Reason) -> receive after 5000 -> exit(Reason) end. trace(Term) -> Term. %% Keep an eye on this Pid (debug) --ifndef(undefined). -monitor_dist_proc(Pid) -> +-ifdef(undefined). +monitor_dist_proc(_Tag, Pid) -> Pid. -else. -monitor_dist_proc(Pid) -> +monitor_dist_proc(Tag, Pid) -> spawn( fun () -> MRef = erlang:monitor(process, Pid), + error_logger:info_report( + [?FUNCTION_NAME, + {type, Tag}, + {pid, Pid}]), receive {'DOWN', MRef, _, _, normal} -> error_logger:error_report( - [dist_proc_died, + [?FUNCTION_NAME, {reason, normal}, {pid, Pid}]); {'DOWN', MRef, _, _, Reason} -> error_logger:info_report( - [dist_proc_died, + [?FUNCTION_NAME, {reason, Reason}, {pid, Pid}]) end |