summaryrefslogtreecommitdiff
path: root/erts
diff options
context:
space:
mode:
authorRaimo Niskanen <raimo@erlang.org>2020-01-21 16:28:28 +0100
committerRaimo Niskanen <raimo@erlang.org>2020-01-22 09:58:55 +0100
commit8a577a4682f8b050f3d0dc0433b331db5bf57b4d (patch)
treeaedcd70b98f66c7f90d7285d758b3769b0729cad /erts
parent155d2d4374f1efcec7800ae27bccfd528407b04f (diff)
downloaderlang-8a577a4682f8b050f3d0dc0433b331db5bf57b4d.tar.gz
Clean up send code and ref handling
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c13
-rw-r--r--erts/preloaded/ebin/socket.beambin80112 -> 79808 bytes
-rw-r--r--erts/preloaded/src/socket.erl204
3 files changed, 111 insertions, 106 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 5cd2167d04..688bac5751 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -6798,7 +6798,7 @@ ERL_NIF_TERM esock_send(ErlNifEnv* env,
if (!descP->isWritable)
return enif_make_badarg(env);
- /* Check if there is already a current writer and if its us */
+ /* Ensure that we either have no current writer or we are it */
if (!send_check_writer(env, descP, sendRef, &writerCheck))
return writerCheck;
@@ -6930,7 +6930,7 @@ ERL_NIF_TERM esock_sendto(ErlNifEnv* env,
if (!descP->isWritable)
return enif_make_badarg(env);
- /* Check if there is already a current writer and if its us */
+ /* Ensure that we either have no current writer or we are it */
if (!send_check_writer(env, descP, sendRef, &writerCheck))
return writerCheck;
@@ -7060,7 +7060,7 @@ ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env,
return enif_make_badarg(env);
}
- /* Check if there is already a current writer and if its us */
+ /* Ensure that we either have no current writer or we are it */
if (!send_check_writer(env, descP, sendRef, &writerCheck)) {
SSDBG( descP,
@@ -15668,11 +15668,14 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
enif_set_pid_undefined(&descP->currentWriter.pid);
return esock_make_error(env, atom_exmon);
} else {
- ESOCK_ASSERT(!descP->currentWriter.env);
+ ESOCK_ASSERT(descP->currentWriter.env == NULL);
descP->currentWriter.env = esock_alloc_env("current-writer");
- descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef);
+ descP->currentWriter.ref =
+ CP_TERM(descP->currentWriter.env, sendRef);
descP->currentWriterP = &descP->currentWriter;
}
+ } else {
+ descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef);
}
ESOCK_CNT_INC(env, descP, sockRef, atom_write_waits, &descP->writeWaits, 1);
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 83140a39e0..bd238f0f7d 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 8727e663b7..dc6085eb52 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -1300,13 +1300,15 @@ bind(#socket{ref = SockRef}, Addr)
when ((Addr =:= any) orelse
(Addr =:= broadcast) orelse
(Addr =:= loopback)) ->
- try which_domain(SockRef) of
- inet ->
- nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr));
- inet6 when (Addr =:= any) orelse (Addr =:= loopback) ->
- nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr));
- _ ->
- einval()
+ try
+ case which_domain(SockRef) of
+ inet ->
+ nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr));
+ inet6 when (Addr =:= any) orelse (Addr =:= loopback) ->
+ nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr));
+ _ ->
+ einval()
+ end
catch
%% <WIN32-TEMPORARY>
error:notsup:S ->
@@ -1634,67 +1636,87 @@ send(#socket{ref = SockRef}, Data, Flags, Timeout)
((Timeout =:= nowait) orelse
(Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0))) ->
+ To = undefined,
EFlags = enc_send_flags(Flags),
- do_send(SockRef, Data, EFlags, Timeout).
+ Deadline = deadline(Timeout),
+ send_common(SockRef, Data, To, EFlags, Deadline, send).
+
+
+send_common(SockRef, Data, To, EFlags, Deadline, SendName) ->
-do_send(SockRef, Data, EFlags, Timeout) ->
- TS = timestamp(Timeout),
SendRef = make_ref(),
- case nif_send(SockRef, SendRef, Data, EFlags) of
- ok ->
- ok;
+ case
+ case SendName of
+ send ->
+ nif_send(SockRef, SendRef, Data, EFlags);
+ sendto ->
+ nif_sendto(SockRef, SendRef, Data, To, EFlags)
+ end
+ of
- {ok, Written} when (Timeout =:= nowait) ->
- <<_:Written/binary, Rest/binary>> = Data,
+ ok -> ok;
+
+
+ {ok, Written} when (Deadline =:= nowait) ->
%% We are partially done, but the user don't want to wait (here)
%% for completion
- {ok, {Rest, ?SELECT_INFO(send, SendRef)}};
-
+ <<_:Written/binary, Rest/binary>> = Data,
+ {ok, {Rest, ?SELECT_INFO(SendName, SendRef)}};
{ok, Written} ->
- NewTimeout = next_timeout(TS, Timeout),
%% We are partially done, wait for continuation
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef}
when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
- do_send(SockRef, Rest, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Rest, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Data, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
- cancel(SockRef, send, SendRef),
+ after Timeout ->
+ cancel(SockRef, SendName, SendRef),
{error, {timeout, size(Data)}}
end;
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(send, SendRef);
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called send, got eagain, and called send again
+ %% - without waiting for continuation
+ erlang:error(Reason);
+
+ {error, eagain} when (Deadline =:= nowait) ->
+ ?SELECT(SendName, SendRef);
{error, eagain} ->
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Data, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
- cancel(SockRef, send, SendRef),
+ cancel(SockRef, SendName, SendRef),
{error, {timeout, size(Data)}}
end;
- {error, _} = ERROR ->
- ERROR
+
+ {error, Reason} ->
+ {error, {Reason, size(Data)}}
end.
@@ -1767,66 +1789,10 @@ sendto(#socket{ref = SockRef}, Data, #{family := Fam} = Dest, Flags, Timeout)
((Timeout =:= nowait) orelse
(Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0))) ->
+ To = ensure_sockaddr(Dest),
EFlags = enc_send_flags(Flags),
- do_sendto(SockRef, Data, ensure_sockaddr(Dest), EFlags, Timeout).
-
-do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
- TS = timestamp(Timeout),
- SendRef = make_ref(),
- case nif_sendto(SockRef, SendRef, Data, Dest, EFlags) of
- ok ->
- %% We are done
- ok;
-
- {ok, Written} when (Timeout =:= nowait) ->
- <<_:Written/binary, Rest/binary>> = Data,
- {ok, {Rest, ?SELECT_INFO(sendto, SendRef)}};
-
-
- {ok, Written} ->
- %% We are partially done, wait for continuation
- receive
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef}
- when (Written > 0) ->
- <<_:Written/binary, Rest/binary>> = Data,
- do_sendto(SockRef, Rest, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendto(SockRef, Data, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
-
- after Timeout ->
- cancel(SockRef, sendto, SendRef),
- {error, timeout}
- end;
-
-
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(sendto, SendRef);
-
-
- {error, eagain} ->
- receive
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendto(SockRef, Data, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
-
- after Timeout ->
- cancel(SockRef, sendto, SendRef),
- {error, timeout}
- end;
-
- {error, _} = ERROR ->
- ERROR
- end.
-
+ Deadline = deadline(Timeout),
+ send_common(SockRef, Data, To, EFlags, Deadline, sendto).
%% ---------------------------------------------------------------------------
@@ -1900,20 +1866,28 @@ sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout)
((Timeout =:= nowait) orelse
(Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0))) ->
- try ensure_msghdr(MsgHdr) of
- M ->
+ try
+ begin
+ M = ensure_msghdr(MsgHdr),
EFlags = enc_send_flags(Flags),
- do_sendmsg(SockRef, M, EFlags, Timeout)
+ Deadline = deadline(Timeout),
+ do_sendmsg(SockRef, M, EFlags, Deadline)
+ end
catch
throw:T ->
T;
+ %% <WIN32-TEMPORARY>
+ error:notsup:S ->
+ erlang:raise(error, notsup, S);
+ %% </WIN32-TEMPORARY>
error:Reason ->
{error, Reason}
end.
-do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+do_sendmsg(SockRef, MsgHdr, EFlags, Deadline) ->
+
SendRef = make_ref(),
+
case nif_sendmsg(SockRef, SendRef, MsgHdr, EFlags) of
ok ->
%% We are done
@@ -1925,28 +1899,36 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
%% be able to handle a message being split. Leave it to
%% the caller to figure out (call again with the rest).
%%
- %% We should really not need to cancel, since this is
- %% accepted for sendmsg!
+ %% We need to cancel this partial write.
%%
cancel(SockRef, sendmsg, SendRef),
{ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)};
- {error, eagain} when (Timeout =:= nowait) ->
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called send, got eagain, and called send again
+ %% - without waiting for continuation
+ erlang:error(Reason);
+
+
+ {error, eagain} when (Deadline =:= nowait) ->
?SELECT(sendmsg, SendRef);
-
{error, eagain} ->
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendmsg(SockRef, MsgHdr, EFlags,
- next_timeout(TS, Timeout))
+ do_sendmsg(SockRef, MsgHdr, EFlags, Deadline)
after Timeout ->
cancel(SockRef, sendmsg, SendRef),
{error, timeout}
end;
+
{error, _} = ERROR ->
ERROR
end.
@@ -3967,6 +3949,26 @@ flush_select_msgs(SockRef, Ref) ->
%% lists:flatten(FormatDate).
+deadline(Timeout) ->
+ case Timeout of
+ nowait -> Timeout;
+ infinity -> Timeout;
+ _ ->
+ timestamp() + Timeout
+ end.
+
+timeout(Deadline) ->
+ case Deadline of
+ infinity -> infinity;
+ _ ->
+ Now = timestamp(),
+ if
+ Now < Deadline -> Deadline - Now;
+ true -> 0
+ end
+ end.
+
+
%% A timestamp in ms
timestamp(nowait = T) ->