summaryrefslogtreecommitdiff
path: root/lib/ssl/test/inet_crypto_dist.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ssl/test/inet_crypto_dist.erl')
-rw-r--r--lib/ssl/test/inet_crypto_dist.erl694
1 files changed, 422 insertions, 272 deletions
diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index 58a303f48c..902bb5c252 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -29,7 +29,7 @@
-define(DRIVER, inet_tcp).
-define(FAMILY, inet).
--export([listen/1, accept/1, accept_connection/5,
+-export([supported/0, listen/1, accept/1, accept_connection/5,
setup/5, close/1, select/1, is_node_name/1]).
%% Generalized dist API, for sibling IPv6 module inet6_crypto_dist
@@ -51,11 +51,23 @@
%% -------------------------------------------------------------------------
+%% The curve choice greatly affects setup time,
+%% we really want an Edwards curve but that would
+%% require a very new openssl version.
+%% Twisted brainpool curves (*t1) are faster than
+%% non-twisted (*r1), 256 is much faster than 384,
+%% and so on...
+%%% -define(CURVE, brainpoolP384t1).
+%%% -define(CURVE, brainpoolP256t1).
+-define(CURVE, secp256r1).
+-define(CIPHER, aes_gcm).
+-define(HMAC, sha256).
+
-record(params,
{socket,
dist_handle,
- hmac_algorithm = sha256,
- aead_cipher = aes_gcm,
+ hmac_algorithm = ?HMAC,
+ aead_cipher = ?CIPHER,
rekey_key,
iv = 12,
key = 16,
@@ -71,20 +83,29 @@ params(Socket) ->
-record(key_pair,
{type = ecdh,
- %% The curve choice greatly affects setup time,
- %% we really want an Edwards curve but that would
- %% require a very new openssl version.
- %% Twisted brainpool curves (*t1) are faster than
- %% non-twisted (*r1), 256 is much faster than 384,
- %% and so on...
-%%% params = brainpoolP384t1,
- params = brainpoolP256t1,
+ params = ?CURVE,
public,
private,
life_time = 3600000, % 1 hour
life_count = 256 % Number of connection setups
}).
+supported() ->
+ Curve = lists:member(?CURVE, crypto:supports(curves)),
+ Cipher = lists:member(?CIPHER, crypto:supports(ciphers)),
+ Hmac =
+ lists:member(hmac, crypto:supports(macs)) andalso
+ lists:member(?HMAC, crypto:supports(hashs)),
+ if
+ not Curve ->
+ "curve " ++ atom_to_list(?CURVE);
+ not Cipher ->
+ "cipher " ++ atom_to_list(?CIPHER);
+ not Hmac ->
+ "HMAC " ++ atom_to_list(?HMAC);
+ true ->
+ ok
+ end.
%% -------------------------------------------------------------------------
%% Keep the node's public/private key pair in the process state
@@ -95,6 +116,7 @@ params(Socket) ->
start_key_pair_server() ->
monitor_dist_proc(
+ key_pair_server,
spawn_link(
fun () ->
register(?MODULE, self()),
@@ -354,6 +376,7 @@ gen_accept(Listen, Driver) ->
%% Spawn Acceptor process
%%
monitor_dist_proc(
+ acceptor,
spawn_opt(
fun () ->
start_key_pair_server(),
@@ -424,6 +447,7 @@ gen_accept_connection(
%% Spawn Controller/handshaker/ticker process
%%
monitor_dist_proc(
+ accept_controller,
spawn_opt(
fun() ->
do_accept(
@@ -466,6 +490,7 @@ gen_setup(Node, Type, MyNode, LongOrShortNames, SetupTime, Driver) ->
%% Spawn Controller/handshaker/ticker process
%%
monitor_dist_proc(
+ setup_controller,
spawn_opt(
setup_fun(
Node, Type, MyNode, LongOrShortNames, SetupTime, Driver, NetKernel),
@@ -796,6 +821,7 @@ start_dist_ctrl(Socket, Timeout) ->
Controller = self(),
Server =
monitor_dist_proc(
+ output_handler,
spawn_opt(
fun () ->
receive
@@ -937,7 +963,12 @@ init_recv(
RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}}
end
catch
- error : Reason : Stacktrace->
+ Class : Reason : Stacktrace when Class =:= error ->
+ error_logger:info_report(
+ [init_recv_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
_ = trace({Reason, Stacktrace}),
exit(connection_closed)
end.
@@ -960,8 +991,8 @@ init_msg(
MsgLen = byte_size(R1A) + TagLen + iolist_size(Plaintext),
AAD = [<<MsgLen:32>>, R1A],
{Ciphertext, Tag} =
- crypto:crypto_one_time(AeadCipher, Key1A, IV1A,
- {AAD, Plaintext, TagLen}, true),
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key1A, IV1A, Plaintext, AAD, TagLen, true),
Msg = [R1A, Tag, Ciphertext],
{R2A, R3A, Msg}.
%%
@@ -982,8 +1013,8 @@ init_msg(
MsgLen = byte_size(Msg),
AAD = [<<MsgLen:32>>, R1B],
case
- crypto:crypto_one_time(
- AeadCipher, Key1B, IV1B, {AAD, Ciphertext, Tag}, false)
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key1B, IV1B, Ciphertext, AAD, Tag, false)
of
<<R2B:RLen/binary, R3B:RLen/binary, PubKeyB/binary>> ->
SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
@@ -1000,8 +1031,9 @@ init_msg(
StartMsgLen = TagLen + iolist_size(StartCleartext),
StartAAD = <<StartMsgLen:32>>,
{StartCiphertext, StartTag} =
- crypto:crypto_one_time(AeadCipher, Key2A, IV2A,
- {StartAAD, StartCleartext, TagLen}, true),
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key2A, IV2A,
+ StartCleartext, StartAAD, TagLen, true),
StartMsg = [StartTag, StartCiphertext],
%%
{Key2B, IV2B} =
@@ -1032,8 +1064,8 @@ start_msg(
MsgLen = byte_size(Msg),
AAD = <<MsgLen:32>>,
case
- crypto:crypto_one_time(
- AeadCipher, Key2B, IV2B, {AAD, Ciphertext, Tag}, false)
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key2B, IV2B, Ciphertext, AAD, Tag, false)
of
<<R2A:RLen/binary, R3A:RLen/binary, RekeyCountB:32>>
when RekeyCountA =< (RekeyCountB bsl 2),
@@ -1044,7 +1076,7 @@ start_msg(
hmac_key_iv(HmacAlgo, MacKey, Data, KeyLen, IVLen) ->
<<Key:KeyLen/binary, IV:IVLen/binary>> =
- crypto:macN(HmacAlgo, MacKey, Data, KeyLen + IVLen),
+ crypto:macN(hmac, HmacAlgo, MacKey, Data, KeyLen + IVLen),
{Key, IV}.
%% -------------------------------------------------------------------------
@@ -1063,20 +1095,14 @@ handshake(
{?MODULE, From, {handshake_complete, DistHandle}} ->
InputHandler =
monitor_dist_proc(
+ input_handler,
spawn_opt(
fun () ->
link(Controller),
receive
DistHandle ->
- ok =
- inet:setopts(
- Socket,
- [{active, ?TCP_ACTIVE},
- nodelay()]),
input_handler(
- RecvParams#params{
- dist_handle = DistHandle},
- RecvSeq, empty_q(), infinity)
+ RecvParams, RecvSeq, DistHandle)
end
end,
[link,
@@ -1085,42 +1111,40 @@ handshake(
{fullsweep_after, 0}])),
_ = monitor(process, InputHandler), % For the benchmark test
ok = gen_tcp:controlling_process(Socket, InputHandler),
+ false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true),
ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler),
InputHandler ! DistHandle,
- crypto:rand_seed_alg(crypto_cache),
reply(From, ok),
process_flag(priority, normal),
- erlang:dist_ctrl_get_data_notification(DistHandle),
- output_handler(
- SendParams#params{
- dist_handle = DistHandle,
- rekey_msg = start_rekey_timer(SendParams#params.rekey_time)},
- SendSeq);
+ output_handler(SendParams, SendSeq, DistHandle);
%%
{?MODULE, From, {send, Data}} ->
- case
+ {SendParams_1, SendSeq_1, Result} =
encrypt_and_send_chunk(
- SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data])
- of
- {SendParams_1, SendSeq_1, ok} ->
+ SendParams, SendSeq,
+ [?HANDSHAKE_CHUNK, Data], 1 + iolist_size(Data)),
+ if
+ Result =:= ok ->
reply(From, ok),
handshake(
SendParams_1, SendSeq_1, RecvParams, RecvSeq,
Controller);
- {_, _, Error} ->
+ true ->
reply(From, {error, closed}),
- death_row({send, trace(Error)})
+ death_row({send, trace(Result)})
end;
{?MODULE, From, recv} ->
- case recv_and_decrypt_chunk(RecvParams, RecvSeq) of
- {RecvParams_1, RecvSeq_1, {ok, _} = Reply} ->
- reply(From, Reply),
+ {RecvParams_1, RecvSeq_1, Result} =
+ recv_and_decrypt_chunk(RecvParams, RecvSeq),
+ case Result of
+ {ok, _} ->
+ reply(From, Result),
handshake(
SendParams, SendSeq, RecvParams_1, RecvSeq_1,
Controller);
- {_, _, Error} ->
- reply(From, Error),
- death_row({recv, trace(Error)})
+ {error, _} ->
+ reply(From, Result),
+ death_row({recv, trace(Result)})
end;
{?MODULE, From, peername} ->
reply(From, inet:peername(Socket)),
@@ -1136,11 +1160,17 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
case decrypt_chunk(RecvParams, RecvSeq, Chunk) of
<<?HANDSHAKE_CHUNK, Cleartext/binary>> ->
{RecvParams, RecvSeq + 1, {ok, Cleartext}};
- OtherChunk when is_binary(OtherChunk) ->
- {RecvParams, RecvSeq + 1, {error, decrypt_error}};
+ UnknownChunk when is_binary(UnknownChunk) ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,unknown_chunk}]),
+ {RecvParams, RecvSeq + 1, {error, unknown_chunk}};
#params{} = RecvParams_1 ->
recv_and_decrypt_chunk(RecvParams_1, 0);
error ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,decrypt_error}]),
{RecvParams, RecvSeq, {error, decrypt_error}}
end;
Error ->
@@ -1150,9 +1180,38 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
%% -------------------------------------------------------------------------
%% Output handler process
%%
-%% The game here is to flush all dist_data and dist_tick messages,
-%% prioritize dist_data over dist_tick, and to not use selective receive
+%% Await an event about what to do; fetch dist data from the VM,
+%% send a dist tick, or rekey outbound encryption parameters.
+%%
+%% In case we are overloaded and could get many accumulated
+%% dist_data or dist_tick messages; make sure to flush all of them
+%% before proceeding with what to do. But, do not use selective
+%% receive since that does not perform well when there are
+%% many messages in the process mailbox.
+
+%% Entry function
+output_handler(Params, Seq, DistHandle) ->
+ try
+ _ = crypto:rand_seed_alg(crypto_cache),
+ erlang:dist_ctrl_get_data_notification(DistHandle),
+ output_handler(
+ Params#params{
+ dist_handle = DistHandle,
+ rekey_msg = start_rekey_timer(Params#params.rekey_time)},
+ Seq)
+ catch
+ Class : Reason : Stacktrace ->
+ error_logger:info_report(
+ [output_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+%% Loop top
+%%
+%% State: lurking until any interesting message
output_handler(Params, Seq) ->
receive
Msg ->
@@ -1162,8 +1221,7 @@ output_handler(Params, Seq) ->
dist_tick ->
output_handler_tick(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
@@ -1171,6 +1229,7 @@ output_handler(Params, Seq) ->
end
end.
+%% State: we have received at least one dist_data message
output_handler_data(Params, Seq) ->
receive
Msg ->
@@ -1180,21 +1239,19 @@ output_handler_data(Params, Seq) ->
dist_tick ->
output_handler_data(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler_data(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
output_handler_data(Params, Seq)
end
after 0 ->
- DistHandle = Params#params.dist_handle,
- Q = get_data(DistHandle, empty_q()),
- {Params_1, Seq_1} = output_handler_send(Params, Seq, Q),
- erlang:dist_ctrl_get_data_notification(DistHandle),
+ {Params_1, Seq_1} = output_handler_xfer(Params, Seq),
+ erlang:dist_ctrl_get_data_notification(Params#params.dist_handle),
output_handler(Params_1, Seq_1)
end.
+%% State: we have received at least one dist_tick but no dist_data message
output_handler_tick(Params, Seq) ->
receive
Msg ->
@@ -1204,8 +1261,7 @@ output_handler_tick(Params, Seq) ->
dist_tick ->
output_handler_tick(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
@@ -1214,185 +1270,273 @@ output_handler_tick(Params, Seq) ->
after 0 ->
TickSize = 7 + rand:uniform(56),
TickData = binary:copy(<<0>>, TickSize),
- case
- encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData])
- of
- {Params_1, Seq_1, ok} ->
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(
+ Params, Seq, [?TICK_CHUNK, TickData], 1 + TickSize),
+ if
+ Result =:= ok ->
output_handler(Params_1, Seq_1);
- {_, _, Error} ->
- _ = trace(Error),
- death_row()
+ true ->
+ death_row({send_tick, trace(Result)})
end
end.
output_handler_rekey(Params, Seq) ->
case encrypt_and_send_rekey_chunk(Params, Seq) of
#params{} = Params_1 ->
- Params_1;
+ output_handler(Params_1, 0);
SendError ->
- _ = trace(SendError),
- death_row()
+ death_row({send_rekey, trace(SendError)})
end.
-output_handler_send(Params, Seq, {_, Size, _} = Q) ->
- if
- ?CHUNK_SIZE < Size ->
- output_handler_deq_send(Params, Seq, Q, ?CHUNK_SIZE);
- true ->
- case get_data(Params#params.dist_handle, Q) of
- {_, 0, _} ->
+
+%% Get outbound data from VM; encrypt and send,
+%% until the VM has no more
+%%
+output_handler_xfer(Params, Seq) ->
+ output_handler_xfer(Params, Seq, [], 0, []).
+%%
+%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size
+%%
+output_handler_xfer(Params, Seq, Front, Size, Rear)
+ when ?CHUNK_SIZE =< Size ->
+ %%
+ %% We have a full chunk or more
+ %% -> collect one chunk or less and send
+ output_handler_collect(Params, Seq, Front, Size, Rear);
+output_handler_xfer(Params, Seq, Front, Size, Rear) ->
+ %% when Size < ?CHUNK_SIZE ->
+ %%
+ %% We do not have a full chunk -> try to fetch more from VM
+ case erlang:dist_ctrl_get_data(Params#params.dist_handle) of
+ none ->
+ if
+ Size =:= 0 ->
+ %% No more data from VM, nothing buffered
+ %% -> go back to lurking
{Params, Seq};
- {_, Size, _} = Q_1 -> % Got no more
- output_handler_deq_send(Params, Seq, Q_1, Size);
- Q_1 ->
- output_handler_send(Params, Seq, Q_1)
- end
+ true ->
+ %% The VM had no more -> send what we have
+ output_handler_collect(Params, Seq, Front, Size, Rear)
+ end;
+ {Len,Iov} ->
+ output_handler_enq(
+ Params, Seq, Front, Size + 4 + Len, [<<Len:32>>|Rear], Iov)
end.
-output_handler_deq_send(Params, Seq, Q, Size) ->
- {Cleartext, Q_1} = deq_iovec(Size, Q),
- case
- encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext])
- of
- {Params_1, Seq_1, ok} ->
- output_handler_send(Params_1, Seq_1, Q_1);
- {_, _, Error} ->
- _ = trace(Error),
- death_row()
+%% Enqueue VM data while splitting large binaries into ?CHUNK_SIZE
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, []) ->
+ output_handler_xfer(Params, Seq, Front, Size, Rear);
+output_handler_enq(Params, Seq, Front, Size, Rear, [Bin|Iov]) ->
+ output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin).
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin) ->
+ BinSize = byte_size(Bin),
+ if
+ BinSize =< ?CHUNK_SIZE ->
+ output_handler_enq(
+ Params, Seq, Front, Size, [Bin|Rear], Iov);
+ true ->
+ <<Bin1:?CHUNK_SIZE/binary, Bin2/binary>> = Bin,
+ output_handler_enq(
+ Params, Seq, Front, Size, [Bin1|Rear], Iov, Bin2)
end.
-%% -------------------------------------------------------------------------
-%% Input handler process
+%% Collect small binaries into chunks of at most ?CHUNK_SIZE
%%
-%% Here is T = 0|infinity to steer if we should try to receive
-%% more data or not; start with infinity, and when we get some
-%% data try with 0 to see if more is waiting
-
-input_handler(#params{socket = Socket} = Params, Seq, Q, T) ->
- receive
- Msg ->
- case Msg of
- {tcp_passive, Socket} ->
- ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]),
- Q_1 =
- case T of
- 0 ->
- deliver_data(Params#params.dist_handle, Q);
- infinity ->
- Q
- end,
- input_handler(Params, Seq, Q_1, infinity);
- {tcp, Socket, Chunk} ->
- input_chunk(Params, Seq, Q, T, Chunk);
- {tcp_closed, Socket} ->
- exit(connection_closed);
- Other ->
- %% Ignore...
- _ = trace(Other),
- input_handler(Params, Seq, Q, T)
- end
- after T ->
- Q_1 = deliver_data(Params#params.dist_handle, Q),
- input_handler(Params, Seq, Q_1, infinity)
+output_handler_collect(Params, Seq, [], Zero, []) ->
+ 0 = Zero, % Assert
+ %% No more enqueued -> try to get more form VM
+ output_handler_xfer(Params, Seq);
+output_handler_collect(Params, Seq, Front, Size, Rear) ->
+ output_handler_collect(Params, Seq, Front, Size, Rear, [], 0).
+%%
+output_handler_collect(Params, Seq, [], Zero, [], Acc, DataSize) ->
+ 0 = Zero, % Assert
+ output_handler_chunk(Params, Seq, [], Zero, [], Acc, DataSize);
+output_handler_collect(Params, Seq, [], Size, Rear, Acc, DataSize) ->
+ %% Okasaki queue transfer Rear -> Front
+ output_handler_collect(
+ Params, Seq, lists:reverse(Rear), Size, [], Acc, DataSize);
+output_handler_collect(
+ Params, Seq, [Bin|Iov] = Front, Size, Rear, Acc, DataSize) ->
+ BinSize = byte_size(Bin),
+ DataSize_1 = DataSize + BinSize,
+ if
+ ?CHUNK_SIZE < DataSize_1 ->
+ %% Bin does not fit in chunk -> send Acc
+ output_handler_chunk(
+ Params, Seq, Front, Size, Rear, Acc, DataSize);
+ DataSize_1 < ?CHUNK_SIZE ->
+ %% Chunk not full yet -> try to accumulate more
+ output_handler_collect(
+ Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1);
+ true -> % DataSize_1 == ?CHUNK_SIZE ->
+ %% Optimize one iteration; Bin fits exactly -> accumulate and send
+ output_handler_chunk(
+ Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1)
end.
-input_chunk(Params, Seq, Q, T, Chunk) ->
- case decrypt_chunk(Params, Seq, Chunk) of
- <<?DATA_CHUNK, Cleartext/binary>> ->
- input_handler(Params, Seq + 1, enq_binary(Cleartext, Q), 0);
- <<?TICK_CHUNK, _/binary>> ->
- input_handler(Params, Seq + 1, Q, T);
- OtherChunk when is_binary(OtherChunk) ->
- _ = trace(invalid_chunk),
- exit(connection_closed);
- #params{} = Params_1 ->
- input_handler(Params_1, 0, Q, T);
- error ->
- _ = trace(decrypt_error),
- exit(connection_closed)
+%% Encrypt and send a chunk
+%%
+output_handler_chunk(Params, Seq, Front, Size, Rear, Acc, DataSize) ->
+ Data = lists:reverse(Acc),
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK|Data], 1 + DataSize),
+ if
+ Result =:= ok ->
+ %% Try to collect another chunk
+ output_handler_collect(Params_1, Seq_1, Front, Size, Rear);
+ true ->
+ death_row({send_chunk, trace(Result)})
end.
%% -------------------------------------------------------------------------
-%% erlang:dist_ctrl_* helpers
+%% Input handler process
+%%
-%% Get data for sending from the VM and place it in a queue
+%% Entry function
+input_handler(#params{socket = Socket} = Params, Seq, DistHandle) ->
+ try
+ ok =
+ inet:setopts(
+ Socket, [{active, ?TCP_ACTIVE}, nodelay()]),
+ input_handler(
+ Params#params{dist_handle = DistHandle},
+ Seq)
+ catch
+ Class : Reason : Stacktrace ->
+ error_logger:info_report(
+ [input_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+
+%% Loop top
+input_handler(Params, Seq) ->
+ %% Shortcut into the loop
+ {Params_1, Seq_1, Data} = input_data(Params, Seq),
+ input_handler(Params_1, Seq_1, Data, [], byte_size(Data)).
%%
-get_data(DistHandle, {Front, Size, Rear}) ->
- get_data(DistHandle, Front, Size, Rear).
+input_handler(Params, Seq, First, Buffer, Size) ->
+ %% Size is size of First + Buffer
+ case First of
+ <<Packet1Size:32, Packet1:Packet1Size/binary,
+ Packet2Size:32, Packet2:Packet2Size/binary, Rest/binary>> ->
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(DistHandle, Packet1),
+ erlang:dist_ctrl_put_data(DistHandle, Packet2),
+ input_handler(
+ Params, Seq, Rest,
+ Buffer, Size - (8 + Packet1Size + Packet2Size));
+ <<PacketSize:32, Packet:PacketSize/binary, Rest/binary>> ->
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(
+ Params, Seq, Rest, Buffer, Size - (4 + PacketSize));
+ <<PacketSize:32, PacketStart/binary>> ->
+ %% Partial packet in First
+ input_handler(
+ Params, Seq, PacketStart, Buffer, Size - 4, PacketSize);
+ <<Bin/binary>> ->
+ %% Partial header in First
+ if
+ 4 =< Size ->
+ %% Complete header in First + Buffer
+ {First_1, Buffer_1, PacketSize} =
+ input_get_packet_size(Bin, lists:reverse(Buffer)),
+ input_handler(
+ Params, Seq, First_1, Buffer_1, Size - 4, PacketSize);
+ true ->
+ %% Incomplete header received so far
+ {Params_1, Seq_1, More} = input_data(Params, Seq),
+ input_handler(
+ Params_1, Seq_1, Bin,
+ [More|Buffer], Size + byte_size(More))
+ end
+ end.
%%
-get_data(DistHandle, Front, Size, Rear) ->
- case erlang:dist_ctrl_get_data(DistHandle) of
- none ->
- {Front, Size, Rear};
- Bin when is_binary(Bin) ->
- Len = byte_size(Bin),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- [Bin, <<Len:32>>|Rear]);
- [Bin1, Bin2] ->
- Len = byte_size(Bin1) + byte_size(Bin2),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- [Bin2, Bin1, <<Len:32>>|Rear]);
- Iovec ->
- Len = iolist_size(Iovec),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- lists:reverse(Iovec, [<<Len:32>>|Rear]))
+input_handler(Params, Seq, PacketStart, Buffer, Size, PacketSize) ->
+ %% Size is size of PacketStart + Buffer
+ RestSize = Size - PacketSize,
+ if
+ RestSize < 0 ->
+ %% Incomplete packet received so far
+ {Params_1, Seq_1, More} = input_data(Params, Seq),
+ input_handler(
+ Params_1, Seq_1, PacketStart,
+ [More|Buffer], Size + byte_size(More), PacketSize);
+ 0 < RestSize, Buffer =:= [] ->
+ %% Rest data in PacketStart
+ <<Packet:PacketSize/binary, Rest/binary>> = PacketStart,
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(Params, Seq, Rest, [], RestSize);
+ Buffer =:= [] -> % RestSize == 0
+ %% No rest data
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(DistHandle, PacketStart),
+ input_handler(Params, Seq);
+ true ->
+ %% Split packet from rest data
+ LastBin = hd(Buffer),
+ <<PacketLast:(byte_size(LastBin) - RestSize)/binary,
+ Rest/binary>> = LastBin,
+ Packet = [PacketStart|lists:reverse(tl(Buffer), PacketLast)],
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(Params, Seq, Rest, [], RestSize)
end.
-%% De-packet and deliver received data to the VM from a queue
-%%
-deliver_data(DistHandle, Q) ->
- case Q of
- {[], Size, []} ->
- Size = 0, % Assert
- Q;
- {[], Size, Rear} ->
- [Bin|Front] = lists:reverse(Rear),
- deliver_data(DistHandle, Front, Size, [], Bin);
- {[Bin|Front], Size, Rear} ->
- deliver_data(DistHandle, Front, Size, Rear, Bin)
+input_get_packet_size(First, [Bin|Buffer]) ->
+ MissingSize = 4 - byte_size(First),
+ if
+ MissingSize =< byte_size(Bin) ->
+ <<Last:MissingSize/binary, Rest/binary>> = Bin,
+ <<PacketSize:32>> = <<First/binary, Last/binary>>,
+ {Rest, lists:reverse(Buffer), PacketSize};
+ true ->
+ input_get_packet_size(<<First/binary, Bin/binary>>, Buffer)
end.
+
+input_data(Params, Seq) ->
+ receive Msg -> input_data(Params, Seq, Msg) end.
%%
-deliver_data(DistHandle, Front, Size, Rear, Bin) ->
- case Bin of
- <<DataSizeA:32, DataA:DataSizeA/binary,
- DataSizeB:32, DataB:DataSizeB/binary, Rest/binary>> ->
- erlang:dist_ctrl_put_data(DistHandle, DataA),
- erlang:dist_ctrl_put_data(DistHandle, DataB),
- deliver_data(
- DistHandle,
- Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear,
- Rest);
- <<DataSize:32, Data:DataSize/binary, Rest/binary>> ->
- erlang:dist_ctrl_put_data(DistHandle, Data),
- deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest);
- <<DataSize:32, FirstData/binary>> ->
- TotalSize = 4 + DataSize,
- if
- TotalSize =< Size ->
- BinSize = byte_size(Bin),
- {MoreData, Q} =
- deq_iovec(
- TotalSize - BinSize,
- Front, Size - BinSize, Rear),
- erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]),
- deliver_data(DistHandle, Q);
- true -> % Incomplete data
- {[Bin|Front], Size, Rear}
+input_data(#params{socket = Socket} = Params, Seq, Msg) ->
+ case Msg of
+ {tcp_passive, Socket} ->
+ ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]),
+ input_data(Params, Seq);
+ {tcp, Socket, Ciphertext} ->
+ case decrypt_chunk(Params, Seq, Ciphertext) of
+ <<?DATA_CHUNK, Chunk/binary>> ->
+ {Params, Seq + 1, Chunk};
+ <<?TICK_CHUNK, _Dummy/binary>> ->
+ input_data(Params, Seq + 1);
+ <<UnknownChunk/binary>> ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason, unknown_chunk}]),
+ _ = trace(UnknownChunk),
+ exit(connection_closed);
+ #params{} = Params_1 ->
+ input_data(Params_1, 0);
+ error ->
+ _ = trace(decrypt_error),
+ exit(connection_closed)
end;
- <<_/binary>> ->
- BinSize = byte_size(Bin),
- if
- 4 =< Size -> % Fragmented header - extract a header bin
- {RestHeader, {Front_1, _Size_1, Rear_1}} =
- deq_iovec(4 - BinSize, Front, Size - BinSize, Rear),
- Header = iolist_to_binary([Bin|RestHeader]),
- deliver_data(DistHandle, Front_1, Size, Rear_1, Header);
- true -> % Incomplete header
- {[Bin|Front], Size, Rear}
- end
+ {tcp_closed = Reason, Socket} ->
+ error_logger:info_report(
+ [?FUNCTION_NAME,
+ {reason, Reason}]),
+ exit(connection_closed);
+ Other ->
+ %% Ignore...
+ _ = trace(Other),
+ input_data(Params, Seq)
end.
%% -------------------------------------------------------------------------
@@ -1400,20 +1544,23 @@ deliver_data(DistHandle, Front, Size, Rear, Bin) ->
encrypt_and_send_chunk(
#params{
- socket = Socket, rekey_count = Seq, rekey_msg = RekeyMsg} = Params,
- Seq, Cleartext) ->
+ socket = Socket, rekey_count = RekeyCount, rekey_msg = RekeyMsg} = Params,
+ Seq, Cleartext, Size) when Seq =:= RekeyCount ->
%%
cancel_rekey_timer(RekeyMsg),
case encrypt_and_send_rekey_chunk(Params, Seq) of
#params{} = Params_1 ->
Result =
- gen_tcp:send(Socket, encrypt_chunk(Params, 0, Cleartext)),
+ gen_tcp:send(
+ Socket, encrypt_chunk(Params, 0, Cleartext, Size)),
{Params_1, 1, Result};
SendError ->
{Params, Seq + 1, SendError}
end;
-encrypt_and_send_chunk(#params{socket = Socket} = Params, Seq, Cleartext) ->
- Result = gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext)),
+encrypt_and_send_chunk(
+ #params{socket = Socket} = Params, Seq, Cleartext, Size) ->
+ Result =
+ gen_tcp:send(Socket, encrypt_chunk(Params, Seq, Cleartext, Size)),
{Params, Seq + 1, Result}.
encrypt_and_send_rekey_chunk(
@@ -1430,7 +1577,9 @@ encrypt_and_send_rekey_chunk(
#key_pair{public = PubKeyA} = KeyPair = get_new_key_pair(),
case
gen_tcp:send(
- Socket, encrypt_chunk(Params, Seq, [?REKEY_CHUNK, PubKeyA]))
+ Socket,
+ encrypt_chunk(
+ Params, Seq, [?REKEY_CHUNK, PubKeyA], 1 + byte_size(PubKeyA)))
of
ok ->
SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
@@ -1445,18 +1594,19 @@ encrypt_and_send_rekey_chunk(
SendError ->
SendError
end.
-
+
encrypt_chunk(
#params{
aead_cipher = AeadCipher,
- iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen}, Seq, Cleartext) ->
+ iv = {IVSalt, IVNo}, key = Key, tag_len = TagLen},
+ Seq, Cleartext, Size) ->
%%
- ChunkLen = iolist_size(Cleartext) + TagLen,
+ ChunkLen = Size + TagLen,
AAD = <<Seq:32, ChunkLen:32>>,
IVBin = <<IVSalt/binary, (IVNo + Seq):48>>,
{Ciphertext, CipherTag} =
- crypto:crypto_one_time(AeadCipher, Key, IVBin,
- {AAD, Cleartext, TagLen}, true),
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key, IVBin, Cleartext, AAD, TagLen, true),
Chunk = [Ciphertext,CipherTag],
Chunk.
@@ -1468,31 +1618,33 @@ decrypt_chunk(
ChunkLen = byte_size(Chunk),
if
ChunkLen < TagLen ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,short_chunk}]),
error;
true ->
AAD = <<Seq:32, ChunkLen:32>>,
IVBin = <<IVSalt/binary, (IVNo + Seq):48>>,
CiphertextLen = ChunkLen - TagLen,
- case Chunk of
- <<Ciphertext:CiphertextLen/binary,
- CipherTag:TagLen/binary>> ->
- block_decrypt(
- Params, Seq, AeadCipher, Key, IVBin,
- {AAD, Ciphertext, CipherTag});
- _ ->
- error
- end
+ <<Ciphertext:CiphertextLen/binary,
+ CipherTag:TagLen/binary>> = Chunk,
+ block_decrypt(
+ Params, Seq, AeadCipher, Key, IVBin,
+ Ciphertext, AAD, CipherTag)
end.
block_decrypt(
#params{
rekey_key = #key_pair{public = PubKeyA} = KeyPair,
rekey_count = RekeyCount} = Params,
- Seq, AeadCipher, Key, IV, Data) ->
- case crypto:crypto_one_time(AeadCipher, Key, IV, Data, false) of
- <<?REKEY_CHUNK, Rest/binary>> ->
+ Seq, AeadCipher, Key, IV, Ciphertext, AAD, CipherTag) ->
+ case
+ crypto:crypto_one_time_aead(
+ AeadCipher, Key, IV, Ciphertext, AAD, CipherTag, false)
+ of
+ <<?REKEY_CHUNK, Chunk/binary>> ->
PubKeyLen = byte_size(PubKeyA),
- case Rest of
+ case Chunk of
<<PubKeyB:PubKeyLen/binary>> ->
SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
KeyLen = byte_size(Key),
@@ -1504,56 +1656,50 @@ block_decrypt(
SharedSecret, [Key, IV], KeyLen, IVLen),
Params#params{iv = {IVSalt, IVNo}, key = Key_1};
_ ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,bad_rekey_chunk}]),
error
end;
Chunk when is_binary(Chunk) ->
case Seq of
RekeyCount ->
%% This was one chunk too many without rekeying
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,rekey_overdue}]),
error;
_ ->
Chunk
end;
error ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,decrypt_error}]),
error
end.
%% -------------------------------------------------------------------------
-%% Queue of binaries i.e an iovec queue
-
-empty_q() ->
- {[], 0, []}.
-enq_binary(Bin, {Front, Size, Rear}) ->
- {Front, Size + byte_size(Bin), [Bin|Rear]}.
+%% Wait for getting killed by process link,
+%% and if that does not happen - drop dead
-deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size ->
- deq_iovec(GetSize, Front, Size, Rear, []).
-%%
-deq_iovec(GetSize, Front, Size, Rear) ->
- deq_iovec(GetSize, Front, Size, Rear, []).
-%%
-deq_iovec(GetSize, [], Size, Rear, Acc) ->
- deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc);
-deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) ->
- BinSize = byte_size(Bin),
- if
- BinSize < GetSize ->
- deq_iovec(
- GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]);
- GetSize < BinSize ->
- {Bin1,Bin2} = erlang:split_binary(Bin, GetSize),
- {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}};
- true ->
- {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}}
+death_row(Reason) ->
+ error_logger:info_report(
+ [?FUNCTION_NAME,
+ {reason, Reason},
+ {pid, self()}]),
+ receive
+ after 5000 ->
+ death_row_timeout(Reason)
end.
-%% -------------------------------------------------------------------------
-
-death_row() -> death_row(connection_closed).
-%%
-death_row(normal) -> death_row(connection_closed);
-death_row(Reason) -> receive after 5000 -> exit(Reason) end.
+death_row_timeout(Reason) ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason, Reason},
+ {pid, self()}]),
+ exit(Reason).
%% -------------------------------------------------------------------------
@@ -1561,23 +1707,27 @@ death_row(Reason) -> receive after 5000 -> exit(Reason) end.
trace(Term) -> Term.
%% Keep an eye on this Pid (debug)
--ifndef(undefined).
-monitor_dist_proc(Pid) ->
+-ifdef(undefined).
+monitor_dist_proc(_Tag, Pid) ->
Pid.
-else.
-monitor_dist_proc(Pid) ->
+monitor_dist_proc(Tag, Pid) ->
spawn(
fun () ->
MRef = erlang:monitor(process, Pid),
+ error_logger:info_report(
+ [?FUNCTION_NAME,
+ {type, Tag},
+ {pid, Pid}]),
receive
{'DOWN', MRef, _, _, normal} ->
error_logger:error_report(
- [dist_proc_died,
+ [?FUNCTION_NAME,
{reason, normal},
{pid, Pid}]);
{'DOWN', MRef, _, _, Reason} ->
error_logger:info_report(
- [dist_proc_died,
+ [?FUNCTION_NAME,
{reason, Reason},
{pid, Pid}])
end