summaryrefslogtreecommitdiff
path: root/erts
diff options
context:
space:
mode:
authorRaimo Niskanen <raimo@erlang.org>2020-02-07 14:11:22 +0100
committerRaimo Niskanen <raimo@erlang.org>2020-02-07 14:11:22 +0100
commit129361a04639482957db8082f658a7f8626a536e (patch)
tree946819655a1548919d018f6e4aa004d532cb5987 /erts
parent266f2dce5b41127a80c54a8290d6c08b0659e9e8 (diff)
parentcd96f9c6934d48d08be61f63b37221a76946d21c (diff)
downloaderlang-129361a04639482957db8082f658a7f8626a536e.tar.gz
Merge branch 'raimo/erts/socket-fixes/OTP-14644' into maint
* raimo/erts/socket-fixes/OTP-14644: Stop using typeof for struct timeval Fix lock order Cleanup locks Cleanup redundant cleanup code Cleanup requestor handling Fix specs Merge domain+type+proto in one getopt Cleanup timeout handling Cleanup parameter check functions Cleanup try-catch handling Fix nif_select and closeMtx handling Fix lock order Bugfix: do not read freed refs Try fix double free Fix close and abort handling Clean up send code and ref handling remove-unsuccesful-experiment: Fix timing dependent test case Fix timing dependent test case Do not self-close socket for econnreset Avoid deadlock and redundant close + abort message
Diffstat (limited to 'erts')
-rw-r--r--erts/configure.in1
-rw-r--r--erts/emulator/nifs/common/socket_nif.c764
-rw-r--r--erts/emulator/nifs/common/socket_util.c51
-rw-r--r--erts/emulator/test/socket_SUITE.erl21
-rw-r--r--erts/preloaded/ebin/socket.beambin80460 -> 79040 bytes
-rw-r--r--erts/preloaded/src/socket.erl962
6 files changed, 964 insertions, 835 deletions
diff --git a/erts/configure.in b/erts/configure.in
index 71c38129b4..609c457393 100644
--- a/erts/configure.in
+++ b/erts/configure.in
@@ -1818,6 +1818,7 @@ AC_CHECK_SIZEOF(long long)
AC_CHECK_SIZEOF(size_t)
AC_CHECK_SIZEOF(off_t)
AC_CHECK_SIZEOF(time_t)
+AC_CHECK_SIZEOF(suseconds_t)
BITS64=
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 36dfe4e937..ab95748458 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -368,7 +368,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define ESOCK_GLOBAL_DEBUG_DEFAULT FALSE
#define ESOCK_DEBUG_DEFAULT FALSE
-/* Counters and stuff (Don't know where to sent this stuff anyway) */
+/* Counters and stuff (Don't know where to sen2 this stuff anyway) */
#define ESOCK_NIF_IOW_DEFAULT FALSE
@@ -487,6 +487,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define ESOCK_RECV_FLAG_LOW ESOCK_RECV_FLAG_CMSG_CLOEXEC
#define ESOCK_RECV_FLAG_HIGH ESOCK_RECV_FLAG_TRUNC
+#define ESOCK_RECV_BUFFER_COUNT_DEFAULT 0
#define ESOCK_RECV_BUFFER_SIZE_DEFAULT 8192
#define ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
#define ESOCK_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024
@@ -581,6 +582,7 @@ typedef union {
#define ESOCK_OPT_OTP_DOMAIN 0xFF01 // INTERNAL AND ONLY GET
#define ESOCK_OPT_OTP_TYPE 0xFF02 // INTERNAL AND ONLY GET
#define ESOCK_OPT_OTP_PROTOCOL 0xFF03 // INTERNAL AND ONLY GET
+#define ESOCK_OPT_OTP_DTP 0xFF04 // INTERNAL AND ONLY GET
#define ESOCK_OPT_SOCK_ACCEPTCONN 1
#define ESOCK_OPT_SOCK_BINDTODEVICE 3
@@ -984,6 +986,9 @@ typedef struct {
ERL_NIF_TERM closeRef;
BOOLEAN_T closeLocal;
+ /* Lock order: closeMtx, readMtx, accMtx, writeMtx, cfgMtx
+ * unordered: cntMtx
+ */
} ESockDescriptor;
@@ -993,13 +998,19 @@ typedef struct {
/* These are for debugging, testing and the like */
// ERL_NIF_TERM version;
// ERL_NIF_TERM buildDate;
+
+ /* XXX Should be locked but too awkward and small gain */
BOOLEAN_T dbg;
/* Registry stuff */
- ErlNifPid regPid;
+ ErlNifPid regPid; /* Constant - not locked */
+ /* XXX
+ * Should be locked but too awkward for no gain since it is not used yet
+ */
BOOLEAN_T iow; // Where do we send this? Subscription?
- ErlNifMutex* cntMtx;
+
+ ErlNifMutex* cntMtx; /* Locks the below */
/* Its extreme overkill to have these counters be 64-bit,
* but since the other counters are, its much simpler to
* let to let these be 64-bit also
@@ -1831,6 +1842,7 @@ static ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env,
* *** esock_getopt_otp_domain ***
* *** esock_getopt_otp_type ***
* *** esock_getopt_otp_protocol ***
+ * *** esock_getopt_otp_dtp ***
*/
#define ESOCK_GETOPT_OTP_FUNCS \
ESOCK_GETOPT_OTP_FUNC_DEF(debug); \
@@ -1843,7 +1855,8 @@ static ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env,
ESOCK_GETOPT_OTP_FUNC_DEF(fd); \
ESOCK_GETOPT_OTP_FUNC_DEF(domain); \
ESOCK_GETOPT_OTP_FUNC_DEF(type); \
- ESOCK_GETOPT_OTP_FUNC_DEF(protocol);
+ ESOCK_GETOPT_OTP_FUNC_DEF(protocol); \
+ ESOCK_GETOPT_OTP_FUNC_DEF(dtp);
#define ESOCK_GETOPT_OTP_FUNC_DEF(F) \
static ERL_NIF_TERM esock_getopt_otp_##F(ErlNifEnv* env, \
ESockDescriptor* descP)
@@ -2317,6 +2330,10 @@ static ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
ESockDescriptor* descP,
int saveErrno,
ERL_NIF_TERM sockRef);
+static void send_error_waiting_writers(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM reason);
static ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
ESockDescriptor* descP,
ssize_t written,
@@ -2370,10 +2387,10 @@ static ERL_NIF_TERM recv_check_fail(ErlNifEnv* env,
ErlNifBinary* buf2P,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
-static ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env,
- ESockDescriptor* descP,
- ERL_NIF_TERM sockRef,
- ERL_NIF_TERM recvRef);
+static ERL_NIF_TERM recv_check_fail_econnreset(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recv_check_partial(ErlNifEnv* env,
ESockDescriptor* descP,
ssize_t read,
@@ -2671,6 +2688,8 @@ ESOCK_OPERATOR_FUNCS_DEFS
static BOOLEAN_T requestor_pop(ESockRequestQueue* q,
ESockRequestor* reqP);
+static void requestor_clear(ESockRequestor* reqP);
+
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
ESockRequestQueue* q,
ErlNifPid* pid);
@@ -2695,6 +2714,10 @@ static int esock_demonitor(const char* slogan,
static void esock_monitor_init(ESockMonitor* mon);
static ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env,
const ESockMonitor* monP);
+static void esock_release_current(const char* slogan,
+ ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ESockRequestor* current);
#endif // if defined(__WIN32__)
@@ -3190,6 +3213,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket')
/* Local error reason atoms */
#define LOCAL_ERROR_REASON_ATOMS \
+ LOCAL_ATOM_DECL(econnreset); \
LOCAL_ATOM_DECL(eisconn); \
LOCAL_ATOM_DECL(enotclosing); \
LOCAL_ATOM_DECL(enotconn); \
@@ -3340,37 +3364,51 @@ ERL_NIF_TERM nif_info(ErlNifEnv* env,
static
ERL_NIF_TERM esock_global_info(ErlNifEnv* env)
{
- ERL_NIF_TERM numBits = MKCT(env, atom_num_cnt_bits, ESOCK_COUNTER_SIZE);
- ERL_NIF_TERM numSockets = MKCT(env, atom_num_sockets, data.numSockets);
- ERL_NIF_TERM numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams);
- ERL_NIF_TERM numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams);
- ERL_NIF_TERM numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs);
- ERL_NIF_TERM numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal);
- ERL_NIF_TERM numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet);
- ERL_NIF_TERM numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6);
- ERL_NIF_TERM numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP);
- ERL_NIF_TERM numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP);
- ERL_NIF_TERM numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP);
- ERL_NIF_TERM numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP);
- ERL_NIF_TERM gcnt[] = {numBits,
- numSockets,
- numTypeDGrams, numTypeStreams, numTypeSeqPkgs,
- numDomLocal, numDomInet, numDomInet6,
- numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP};
- unsigned int lenGCnt = sizeof(gcnt) / sizeof(ERL_NIF_TERM);
- ERL_NIF_TERM lgcnt = MKLA(env, gcnt, lenGCnt);
- ERL_NIF_TERM keys[] = {esock_atom_debug, atom_iow, atom_counters};
- ERL_NIF_TERM vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt};
- ERL_NIF_TERM info;
- unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM);
- unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM);
+ ERL_NIF_TERM
+ numBits, numSockets, numTypeDGrams, numTypeStreams,
+ numTypeSeqPkgs, numDomLocal, numDomInet, numDomInet6,
+ numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP;
- ESOCK_ASSERT( (numKeys == numVals) );
+ MLOCK(data.cntMtx);
+ numBits = MKCT(env, atom_num_cnt_bits, ESOCK_COUNTER_SIZE);
+ numSockets = MKCT(env, atom_num_sockets, data.numSockets);
+ numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams);
+ numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams);
+ numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs);
+ numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal);
+ numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet);
+ numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6);
+ numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP);
+ numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP);
+ numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP);
+ numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP);
+ MUNLOCK(data.cntMtx);
- if (!MKMA(env, keys, vals, numKeys, &info))
- return enif_make_badarg(env);
+ {
+ ERL_NIF_TERM gcnt[] =
+ {numBits,
+ numSockets,
+ numTypeDGrams, numTypeStreams, numTypeSeqPkgs,
+ numDomLocal, numDomInet, numDomInet6,
+ numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP};
+ unsigned int lenGCnt =
+ sizeof(gcnt) / sizeof(ERL_NIF_TERM);
+ ERL_NIF_TERM
+ lgcnt = MKLA(env, gcnt, lenGCnt),
+ keys[] = {esock_atom_debug, atom_iow, atom_counters},
+ vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt},
+ info;
+ unsigned int
+ numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM),
+ numVals = sizeof(vals) / sizeof(ERL_NIF_TERM);
- return info;
+ ESOCK_ASSERT( (numKeys == numVals) );
+
+ if (!MKMA(env, keys, vals, numKeys, &info))
+ return enif_make_badarg(env);
+
+ return info;
+ }
}
@@ -3511,8 +3549,8 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env,
{
ERL_NIF_TERM info;
- MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
+ MLOCK(descP->writeMtx);
{
ERL_NIF_TERM readByteCnt = MKCT(env, atom_read_byte, descP->readByteCnt);
@@ -3548,8 +3586,8 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env,
}
- MUNLOCK(descP->readMtx);
MUNLOCK(descP->writeMtx);
+ MUNLOCK(descP->readMtx);
SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> done with"
"\r\n info: %T"
@@ -5735,8 +5773,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
* safe side we do the best we can to avoid complications...
*/
- MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
+ MLOCK(descP->writeMtx);
MLOCK(descP->cfgMtx);
res = esock_connect(env, descP, sockRef);
@@ -6465,13 +6503,6 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env,
if (esock_accept_accepted(env, descP, sockRef, accSock,
descP->currentAcceptor.pid, remote, &res)) {
- /* Clean out the old cobweb's before trying to invite a new spider */
-
- esock_free_env("esock_accept_accepting_current_accept - "
- "current-accept-env",
- descP->currentAcceptor.env);
- descP->currentAcceptor.env = NULL;
-
if (!activate_next_acceptor(env, descP, sockRef)) {
SSDBG( descP,
@@ -6482,13 +6513,6 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env,
descP->state = ESOCK_STATE_LISTENING;
descP->currentAcceptorP = NULL;
- /* Do we really need this?
- * The activate_next_acceptor (actually the requestor_pop) function
- * initiates these values if there are no waiting acceptor...
- */
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
}
}
@@ -6510,10 +6534,8 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
ERL_NIF_TERM opRef,
int save_errno)
{
- ESockRequestor req;
ERL_NIF_TERM res, reason;
- req.env = NULL;
if (save_errno == ERRNO_BLOCK) {
/*
@@ -6533,12 +6555,17 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
descP->state);
} else {
+ ESockRequestor req;
ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1);
+ esock_release_current("esock_accept_accepting_current_error",
+ env, descP, descP->currentAcceptorP);
+
reason = MKA(env, erl_errno_id(save_errno));
res = esock_make_error(env, reason);
+ req.env = NULL;
while (acceptor_pop(env, descP, &req)) {
SSDBG( descP,
("SOCKET",
@@ -6550,7 +6577,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
DEMONP("esock_accept_accepting_current_error -> pop'ed writer",
env, descP, &req.mon);
}
-
+ descP->currentAcceptorP = NULL;
}
return res;
@@ -6599,6 +6626,21 @@ ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env,
if ((sres = esock_select_read(env, descP->sock, descP, pid,
sockRef, accRef)) < 0) {
+
+ DEMONP("esock_accept_busy_retry - select failed",
+ env, descP, &descP->currentAcceptor.mon);
+ /* It is very unlikely that a next acceptor will be able
+ * to do anything succesful, but we will clean the queue
+ */
+ if (!activate_next_acceptor(env, descP, sockRef)) {
+ SSDBG( descP,
+ ("SOCKET",
+ "esock_accept_busy_retry -> no more acceptors\r\n") );
+
+ descP->state = ESOCK_STATE_LISTENING;
+ descP->currentAcceptorP = NULL;
+ }
+
reason = MKT2(env, esock_atom_select_failed, MKI(env, sres));
res = esock_make_error(env, reason);
} else {
@@ -6658,6 +6700,7 @@ BOOLEAN_T esock_accept_accepted(ErlNifEnv* env,
accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size
accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
accDescP->iow = descP->iow; // Inherit iow
+ accDescP->dbg = descP->dbg; // Inherit debug flag
accRef = enif_make_resource(env, accDescP);
enif_release_resource(accDescP);
@@ -6727,12 +6770,14 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
if ((argc != 4) ||
!GET_BIN(env, argv[2], &sndData) ||
!GET_UINT(env, argv[3], &eflags)) {
+ SSDBG( descP, ("SOCKET", "nif_send -> argv decode failed\r\n") );
return enif_make_badarg(env);
}
sockRef = argv[0]; // We need this in case we send in case we send abort
sendRef = argv[1];
if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) {
+ SSDBG( descP, ("SOCKET", "nif_send -> get resource failed\r\n") );
return enif_make_badarg(env);
}
@@ -6744,8 +6789,10 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
"\r\n eFlags: 0x%lX"
"\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) );
- if (!esendflags2sendflags(eflags, &flags))
+ if (!esendflags2sendflags(eflags, &flags)) {
+ SSDBG( descP, ("SOCKET", "nif_send -> sendflags decode failed\r\n") );
return esock_make_error(env, esock_atom_einval);
+ }
SSDBG( descP, ("SOCKET", "nif_send -> flags: 0x%lX\r\n", flags) );
@@ -6790,12 +6837,17 @@ ERL_NIF_TERM esock_send(ErlNifEnv* env,
ssize_t written;
ERL_NIF_TERM writerCheck;
- if (!descP->isWritable)
- return enif_make_badarg(env);
+ if (!descP->isWritable) {
+ SSDBG( descP, ("SOCKET", "esock_send -> return not writable\r\n") );
+ return esock_make_error(env, atom_closed);
+ }
- /* Check if there is already a current writer and if its us */
- if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ /* Ensure that we either have no current writer or that we are it */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck)) {
+ SSDBG( descP, ("SOCKET", "esock_send -> writer check failed: "
+ "\r\n %T\r\n", writerCheck) );
return writerCheck;
+ }
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
@@ -6857,6 +6909,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
if ((argc != 5) ||
!GET_BIN(env, argv[2], &sndData) ||
!GET_UINT(env, argv[4], &eflags)) {
+ SSDBG( descP, ("SOCKET", "nif_sendto -> argv decode failed\r\n") );
return enif_make_badarg(env);
}
sockRef = argv[0]; // We need this in case we send abort (to the caller)
@@ -6864,6 +6917,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
eSockAddr = argv[3];
if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) {
+ SSDBG( descP, ("SOCKET", "nif_sendto -> get resource failed\r\n") );
return enif_make_badarg(env);
}
@@ -6922,12 +6976,17 @@ ERL_NIF_TERM esock_sendto(ErlNifEnv* env,
ssize_t written;
ERL_NIF_TERM writerCheck;
- if (!descP->isWritable)
- return enif_make_badarg(env);
+ if (!descP->isWritable) {
+ SSDBG( descP, ("SOCKET", "esock_sendto -> return not writable\r\n") );
+ return esock_make_error(env, atom_closed);
+ }
- /* Check if there is already a current writer and if its us */
- if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ /* Ensure that we either have no current writer or we are it */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck)) {
+ SSDBG( descP, ("SOCKET", "esock_sendto -> writer check failed: "
+ "\r\n %T\r\n", writerCheck) );
return writerCheck;
+ }
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
@@ -6988,6 +7047,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
if ((argc != 4) ||
!IS_MAP(env, argv[2]) ||
!GET_UINT(env, argv[3], &eflags)) {
+ SSDBG( descP, ("SOCKET", "nif_sendmsg -> argv decode failed\r\n") );
return enif_make_badarg(env);
}
sockRef = argv[0]; // We need this in case we send abort (to the caller)
@@ -6995,6 +7055,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
eMsgHdr = argv[2];
if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) {
+ SSDBG( descP, ("SOCKET", "nif_sendmsg -> get resource failed\r\n") );
return enif_make_badarg(env);
}
@@ -7006,8 +7067,10 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
"\r\n",
descP->sock, argv[0], sendRef, eflags) );
- if (!esendflags2sendflags(eflags, &flags))
+ if (!esendflags2sendflags(eflags, &flags)) {
+ SSDBG( descP, ("SOCKET", "nif_sendmsg -> sendflags decode failed\r\n") );
return esock_make_error(env, esock_atom_einval);
+ }
MLOCK(descP->writeMtx);
@@ -7049,19 +7112,15 @@ ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env,
char* xres;
if (!descP->isWritable) {
-
SSDBG( descP, ("SOCKET", "esock_sendmsg -> not writable\r\n") );
-
- return enif_make_badarg(env);
+ return esock_make_error(env, atom_closed);
}
- /* Check if there is already a current writer and if its us */
+ /* Ensure that we either have no current writer or we are it */
if (!send_check_writer(env, descP, sendRef, &writerCheck)) {
-
- SSDBG( descP,
- ("SOCKET", "esock_sendmsg -> writer check failed: "
- "\r\n %T\r\n", writerCheck) );
-
+ SSDBG( descP,
+ ("SOCKET", "esock_sendmsg -> writer check failed: "
+ "\r\n %T\r\n", writerCheck) );
return writerCheck;
}
@@ -7333,6 +7392,8 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
MUNLOCK(descP->readMtx);
+ SSDBG( descP, ("SOCKET", "nif_recv -> done: %T\r\n", res) );
+
return res;
#endif // if defined(__WIN32__)
@@ -7367,9 +7428,9 @@ ERL_NIF_TERM esock_recv(ErlNifEnv* env,
flags) );
if (!descP->isReadable)
- return enif_make_badarg(env);
+ return esock_make_error(env, atom_closed);
- /* Check if there is already a current reader and if its us */
+ /* Ensure that we either have no current reader or that we are it */
if (!recv_check_reader(env, descP, recvRef, &readerCheck))
return readerCheck;
@@ -7532,9 +7593,9 @@ ERL_NIF_TERM esock_recvfrom(ErlNifEnv* env,
"\r\n", len, bufSz, flags) );
if (!descP->isReadable)
- return enif_make_badarg(env);
+ return esock_make_error(env, atom_closed);
- /* Check if there is already a current reader and if its us */
+ /* Ensure that we either have no current reader or that we are it */
if (!recv_check_reader(env, descP, recvRef, &readerCheck))
return readerCheck;
@@ -7707,9 +7768,9 @@ ERL_NIF_TERM esock_recvmsg(ErlNifEnv* env,
"\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) );
if (!descP->isReadable)
- return enif_make_badarg(env);
+ return esock_make_error(env, atom_closed);
- /* Check if there is already a current reader and if its us */
+ /* Ensure that we either have no current reader or that we are it */
if (!recv_check_reader(env, descP, recvRef, &readerCheck))
return readerCheck;
@@ -7786,6 +7847,9 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
ESockDescriptor* descP;
+ ERL_NIF_TERM res;
+
+ SGDBG( ("SOCKET", "nif_close -> entry with argc: %d\r\n", argc) );
if ((argc != 1) ||
!ESOCK_GET_RESOURCE(env, argv[0], (void**) &descP)) {
@@ -7795,7 +7859,15 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env,
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
- return esock_close(env, descP);
+ MLOCK(descP->closeMtx);
+
+ res = esock_close(env, descP);
+
+ MUNLOCK(descP->closeMtx);
+
+ SSDBG( descP, ("SOCKET", "nif_close -> res: %T\r\n", res) );
+
+ return res;
#endif // if defined(__WIN32__)
}
@@ -7816,8 +7888,6 @@ ERL_NIF_TERM esock_close(ErlNifEnv* env,
descP->currentReaderP,
descP->currentAcceptorP) );
- MLOCK(descP->closeMtx);
-
doClose = esock_close_check(env, descP, &reason);
if (doClose) {
@@ -7826,8 +7896,6 @@ ERL_NIF_TERM esock_close(ErlNifEnv* env,
reply = esock_make_error(env, reason);
}
- MUNLOCK(descP->closeMtx);
-
SSDBG( descP,
("SOCKET", "esock_close -> [%d] done when: "
"\r\n state: 0x%lX"
@@ -8096,13 +8164,14 @@ ERL_NIF_TERM nif_shutdown(ErlNifEnv* env,
return enif_make_badarg(env);
}
- if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ if (IS_CLOSED(descP))
return esock_make_error(env, atom_closed);
if (!ehow2how(ehow, &how))
return enif_make_badarg(env);
return esock_shutdown(env, descP, how);
+
#endif // if defined(__WIN32__)
}
@@ -11891,7 +11960,7 @@ ERL_NIF_TERM nif_getopt(ErlNifEnv* env,
eIsEncoded = argv[1];
eOpt = argv[3]; // Is "normally" an int, but if raw mode: {Int, ValueSz}
- if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ if (IS_CLOSED(descP))
return esock_make_error(env, atom_closed);
SSDBG( descP,
@@ -12027,6 +12096,10 @@ ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env,
result = esock_getopt_otp_protocol(env, descP);
break;
+ case ESOCK_OPT_OTP_DTP:
+ result = esock_getopt_otp_dtp(env, descP);
+ break;
+
default:
result = esock_make_error(env, esock_atom_einval);
break;
@@ -12144,128 +12217,175 @@ ERL_NIF_TERM esock_getopt_otp_meta(ErlNifEnv* env,
}
-/* esock_getopt_otp_domain - Handle the OTP (level) domain option
- */
static
-ERL_NIF_TERM esock_getopt_otp_domain(ErlNifEnv* env,
- ESockDescriptor* descP)
+ERL_NIF_TERM getopt_otp_domain(ErlNifEnv* env, int domain)
{
- ERL_NIF_TERM result, reason;
- int val = descP->domain;
+ ERL_NIF_TERM result;
- switch (val) {
+ switch (domain) {
case AF_INET:
- result = esock_make_ok2(env, esock_atom_inet);
+ result = esock_atom_inet;
break;
#if defined(HAVE_IN6) && defined(AF_INET6)
case AF_INET6:
- result = esock_make_ok2(env, esock_atom_inet6);
+ result = esock_atom_inet6;
break;
#endif
#if defined(HAVE_SYS_UN_H)
case AF_UNIX:
- result = esock_make_ok2(env, esock_atom_local);
+ result = esock_atom_local;
break;
#endif
default:
- reason = MKT2(env, esock_atom_unknown, MKI(env, val));
- result = esock_make_error(env, reason);
+ result = MKI(env, domain);
break;
}
return result;
}
+/*
+ * esock_getopt_otp_domain - Handle the OTP (level) domain option
+ */
+static
+ERL_NIF_TERM esock_getopt_otp_domain(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ERL_NIF_TERM domain, result;
+
+ domain = getopt_otp_domain(env, descP->domain);
+ result = esock_make_ok2(env, domain);
+
+ return result;
+}
-/* esock_getopt_otp_type - Handle the OTP (level) type options.
- */
static
-ERL_NIF_TERM esock_getopt_otp_type(ErlNifEnv* env,
- ESockDescriptor* descP)
+ERL_NIF_TERM getopt_otp_type(ErlNifEnv* env, int type)
{
- ERL_NIF_TERM result, reason;
- int val = descP->type;
+ ERL_NIF_TERM result;
- switch (val) {
+ switch (type) {
case SOCK_STREAM:
- result = esock_make_ok2(env, esock_atom_stream);
+ result = esock_atom_stream;
break;
case SOCK_DGRAM:
- result = esock_make_ok2(env, esock_atom_dgram);
+ result = esock_atom_dgram;
break;
#ifdef HAVE_SCTP
case SOCK_SEQPACKET:
- result = esock_make_ok2(env, esock_atom_seqpacket);
+ result = esock_atom_seqpacket;
break;
#endif
case SOCK_RAW:
- result = esock_make_ok2(env, esock_atom_raw);
+ result = esock_atom_raw;
break;
case SOCK_RDM:
- result = esock_make_ok2(env, esock_atom_rdm);
+ result = esock_atom_rdm;
break;
default:
- reason = MKT2(env, esock_atom_unknown, MKI(env, val));
- result = esock_make_error(env, reason);
+ result = MKI(env, type);
break;
}
return result;
}
+/*
+ * esock_getopt_otp_type - Handle the OTP (level) type options.
+ */
+static
+ERL_NIF_TERM esock_getopt_otp_type(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ERL_NIF_TERM type, result;
+
+ type = getopt_otp_type(env, descP->type);
+ result = esock_make_ok2(env, type);
+
+ return result;
+}
-/* esock_getopt_otp_protocol - Handle the OTP (level) protocol options.
- */
static
-ERL_NIF_TERM esock_getopt_otp_protocol(ErlNifEnv* env,
- ESockDescriptor* descP)
+ERL_NIF_TERM getopt_otp_protocol(ErlNifEnv* env,
+ ESockDescriptor* descP)
{
- ERL_NIF_TERM result, reason;
+ ERL_NIF_TERM result;
int val = descP->protocol;
- switch (val) {
- case IPPROTO_IP:
+ switch (val) {
+ case IPPROTO_IP:
#if defined(AF_LOCAL)
- if (descP->domain == AF_LOCAL) {
- result = esock_make_ok2(env, esock_atom_default);
- } else {
- result = esock_make_ok2(env, esock_atom_ip);
- }
+ if (descP->domain == AF_LOCAL) {
+ result = esock_atom_default;
+ } else {
+ result = esock_atom_ip;
+ }
#else
- result = esock_make_ok2(env, esock_atom_ip);
+ result = esock_atom_ip;
#endif
- break;
+ break;
- case IPPROTO_TCP:
- result = esock_make_ok2(env, esock_atom_tcp);
- break;
+ case IPPROTO_TCP:
+ result = esock_atom_tcp;
+ break;
- case IPPROTO_UDP:
- result = esock_make_ok2(env, esock_atom_udp);
- break;
+ case IPPROTO_UDP:
+ result = esock_atom_udp;
+ break;
#if defined(HAVE_SCTP)
- case IPPROTO_SCTP:
- result = esock_make_ok2(env, esock_atom_sctp);
- break;
+ case IPPROTO_SCTP:
+ result = esock_atom_sctp;
+ break;
#endif
- default:
- reason = MKT2(env, esock_atom_unknown, MKI(env, val));
- result = esock_make_error(env, reason);
- break;
+ default:
+ result = MKI(env, val);
+ break;
}
return result;
}
+/*
+ * esock_getopt_otp_protocol - Handle the OTP (level) protocol options.
+ */
+static
+ERL_NIF_TERM esock_getopt_otp_protocol(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ERL_NIF_TERM protocol, result;
+ protocol = getopt_otp_protocol(env, descP);
+ result = esock_make_ok2(env, protocol);
+
+ return result;
+}
+
+
+/*
+ * esock_getopt_otp_dtp - Handle the OTP (level) type options.
+ */
+static
+ERL_NIF_TERM esock_getopt_otp_dtp(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ERL_NIF_TERM domain, type, protocol, dtp, result;
+
+ domain = getopt_otp_domain(env, descP->domain);
+ type = getopt_otp_type(env, descP->type);
+ protocol = getopt_otp_protocol(env, descP);
+ dtp = MKT3(env, domain, type, protocol);
+ result = esock_make_ok2(env, dtp);
+
+ return result;
+}
/* The option has *not* been encoded. Instead it has been provided
@@ -15062,12 +15182,6 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET",
"esock_cancel_accept_current -> cancel res: %T\r\n", res) );
- /* Clean out the old cobweb's before trying to invite a new spider */
-
- esock_free_env("esock_cancel_accept_current - current-accept-env",
- descP->currentAcceptor.env);
- descP->currentAcceptor.env = NULL;
-
if (!activate_next_acceptor(env, descP, sockRef)) {
SSDBG( descP,
@@ -15077,13 +15191,6 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env,
descP->state = ESOCK_STATE_LISTENING;
descP->currentAcceptorP = NULL;
- /* Do we really need this?
- * The activate_next_acceptor (actually the requestor_pop) function
- * initiates these values if there are no waiting acceptor...
- */
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
}
SSDBG( descP, ("SOCKET", "esock_cancel_accept_current -> done with result:"
@@ -15187,10 +15294,8 @@ ERL_NIF_TERM esock_cancel_send_current(ErlNifEnv* env,
if (!activate_next_writer(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET", "esock_cancel_send_current -> no more writers\r\n") );
- descP->currentWriterP = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
+
+ descP->currentWriterP = NULL;
}
SSDBG( descP, ("SOCKET", "esock_cancel_send_current -> done with result:"
@@ -15293,10 +15398,8 @@ ERL_NIF_TERM esock_cancel_recv_current(ErlNifEnv* env,
if (!activate_next_reader(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET", "esock_cancel_recv_current -> no more readers\r\n") );
- descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
+
+ descP->currentReaderP = NULL;
}
SSDBG( descP, ("SOCKET", "esock_cancel_recv_current -> done with result:"
@@ -15361,14 +15464,19 @@ ERL_NIF_TERM esock_cancel_mode_select(ErlNifEnv* env,
int smode,
int rmode)
{
+ /* Assumes cancelling only one mode */
+
int selectRes = esock_select_cancel(env, descP->sock, smode, descP);
- if (selectRes & rmode) {
- /* Was cancelled */
- return esock_atom_ok;
- } else if (selectRes > 0) {
- /* Has already sent the message */
- return esock_make_error(env, esock_atom_select_sent);
+ if (selectRes >= 0) {
+ /* Success */
+ if ((selectRes & rmode) != 0) {
+ /* Was cancelled */
+ return esock_atom_ok;
+ } else {
+ /* Has already sent the message */
+ return esock_make_error(env, esock_atom_select_sent);
+ }
} else {
/* Stopped? */
SSDBG( descP, ("SOCKET",
@@ -15406,6 +15514,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL) {
*checkResult = esock_make_error(env, atom_exself);
+ SSDBG( descP, ("SOCKET",
+ "send_check_writer -> exself\r\n") );
return FALSE;
}
@@ -15419,12 +15529,12 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
if (!writer_search4pid(env, descP, &caller))
*checkResult = writer_push(env, descP, caller, ref);
else
- *checkResult = esock_make_error(env, esock_atom_eagain);
+ *checkResult = esock_make_error(env, atom_exbusy);
SSDBG( descP,
("SOCKET",
"send_check_writer -> queue (push) result: %T\r\n",
- checkResult) );
+ *checkResult) );
return FALSE;
@@ -15533,28 +15643,23 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env,
descP->writePkgMax = descP->writePkgMaxCnt;
descP->writePkgMaxCnt = 0;
- if (descP->currentWriterP != NULL) {
- DEMONP("send_check_ok -> current writer",
- env, descP, &descP->currentWriter.mon);
- esock_free_env("send_check_ok", descP->currentWriter.env);
- descP->currentWriter.env = NULL;
- }
-
SSDBG( descP,
("SOCKET", "send_check_ok -> "
"everything written (%d,%d) - done\r\n", dataSize, written) );
+ if (descP->currentWriterP != NULL) {
+ DEMONP("send_check_ok -> current writer",
+ env, descP, &descP->currentWriter.mon);
+ }
/*
* Ok, this write is done maybe activate the next (if any)
*/
-
if (!activate_next_writer(env, descP, sockRef)) {
- descP->currentWriterP = NULL;
- ESOCK_ASSERT(!descP->currentWriter.env);
- descP->currentWriter.env = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
+
+ SSDBG( descP,
+ ("SOCKET", "send_check_ok -> no more writers\r\n") );
+
+ descP->currentWriterP = NULL;
}
return esock_atom_ok;
@@ -15573,10 +15678,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
int saveErrno,
ERL_NIF_TERM sockRef)
{
- ESockRequestor req;
- ERL_NIF_TERM reason;
+ ERL_NIF_TERM reason;
- req.env = NULL;
ESOCK_CNT_INC(env, descP, sockRef, atom_write_fails, &descP->writeFails, 1);
SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) );
@@ -15592,24 +15695,44 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
if (descP->currentWriterP != NULL) {
- DEMONP("send_check_fail -> current writer",
- env, descP, &descP->currentWriter.mon);
+ esock_release_current("send_check_fail",
+ env, descP, descP->currentWriterP);
- while (writer_pop(env, descP, &req)) {
- SSDBG( descP,
- ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref, req.env,
- reason, &req.pid);
- req.env = NULL;
- DEMONP("send_check_fail -> pop'ed writer",
- env, descP, &req.mon);
- }
+ send_error_waiting_writers(env, descP, sockRef, reason);
+
+ descP->currentWriterP = NULL;
}
}
-
return esock_make_error(env, reason);
}
+/* *** send_error_current_writer ***
+ *
+ * Process all waiting writers when a fatal error has occured.
+ * All waiting writers will be "aborted", that is a
+ * nif_abort message will be sent (with ref and reason).
+ */
+static
+void send_error_waiting_writers(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM reason)
+{
+ ESockRequestor req;
+
+ req.env = NULL; /* read by writer_pop before free */
+ while (writer_pop(env, descP, &req)) {
+ SSDBG( descP,
+ ("SOCKET", "send_error_current_writer -> abort %T\r\n",
+ req.pid) );
+ esock_send_abort_msg(env, sockRef, req.ref, req.env,
+ reason, &req.pid);
+ req.env = NULL;
+ DEMONP("send_error_waiting_writers -> pop'ed writer",
+ env, descP, &req.mon);
+ }
+}
+
/* *** send_check_retry ***
@@ -15644,11 +15767,14 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
enif_set_pid_undefined(&descP->currentWriter.pid);
return esock_make_error(env, atom_exmon);
} else {
- ESOCK_ASSERT(!descP->currentWriter.env);
+ ESOCK_ASSERT(descP->currentWriter.env == NULL);
descP->currentWriter.env = esock_alloc_env("current-writer");
- descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef);
+ descP->currentWriter.ref =
+ CP_TERM(descP->currentWriter.env, sendRef);
descP->currentWriterP = &descP->currentWriter;
}
+ } else {
+ descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef);
}
ESOCK_CNT_INC(env, descP, sockRef, atom_write_waits, &descP->writeWaits, 1);
@@ -15724,8 +15850,6 @@ BOOLEAN_T recv_check_reader(ErlNifEnv* env,
}
if (COMPARE_PIDS(&descP->currentReader.pid, &caller) != 0) {
- ERL_NIF_TERM tmp;
-
/* Not the "current reader", so (maybe) push onto queue */
SSDBG( descP,
@@ -15733,15 +15857,14 @@ BOOLEAN_T recv_check_reader(ErlNifEnv* env,
"recv_check_reader -> not (current) reader\r\n") );
if (!reader_search4pid(env, descP, &caller))
- tmp = reader_push(env, descP, caller, ref);
+ *checkResult = reader_push(env, descP, caller, ref);
else
- tmp = esock_make_error(env, esock_atom_eagain);
+ *checkResult = esock_make_error(env, atom_exbusy);
SSDBG( descP,
("SOCKET",
- "recv_check_reader -> queue (push) result: %T\r\n", tmp) );
-
- *checkResult = tmp;
+ "recv_check_reader -> queue (push) result: %T\r\n",
+ *checkResult) );
return FALSE;
@@ -15824,20 +15947,13 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
DEMONP("recv_update_current_reader",
env, descP, &descP->currentReader.mon);
- esock_free_env("recv_update_current_reader - current-read-env",
- descP->currentReader.env);
- descP->currentReader.env = NULL;
-
if (!activate_next_reader(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET",
"recv_update_current_reader -> no more readers\r\n") );
- descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
+ descP->currentReaderP = NULL;
}
}
@@ -15852,7 +15968,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
* Process the current reader and any waiting readers
* when a read (fatal) error has occured.
* All waiting readers will be "aborted", that is a
- * nif_abort message will be sent (with reaf and reason).
+ * nif_abort message will be sent (with ref and reason).
*/
static
void recv_error_current_reader(ErlNifEnv* env,
@@ -15860,14 +15976,13 @@ void recv_error_current_reader(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM reason)
{
- ESockRequestor req;
-
- req.env = NULL;
if (descP->currentReaderP != NULL) {
+ ESockRequestor req;
- DEMONP("recv_error_current_reader -> current reader",
- env, descP, &descP->currentReader.mon);
+ esock_release_current("recv_error_current_reader",
+ env, descP, descP->currentReaderP);
+ req.env = NULL; /* read by reader_pop before free */
while (reader_pop(env, descP, &req)) {
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n",
@@ -15878,6 +15993,8 @@ void recv_error_current_reader(ErlNifEnv* env,
DEMONP("recv_error_current_reader -> pop'ed reader",
env, descP, &req.mon);
}
+
+ descP->currentReaderP = NULL;
}
}
@@ -15918,8 +16035,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
*/
if ((read == 0) && (descP->type == SOCK_STREAM)) {
-
- res = esock_make_error(env, atom_closed);
+ ERL_NIF_TERM reason = atom_closed;
+ res = esock_make_error(env, reason);
ESOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1);
@@ -15932,7 +16049,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* We must also notify any waiting readers!
*/
- recv_error_current_reader(env, descP, sockRef, res);
+ recv_error_current_reader(env, descP, sockRef, reason);
FREE_BIN(bufP);
@@ -16074,27 +16191,25 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env,
ESOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read);
descP->readPkgMaxCnt += read;
- if (descP->rNum > 0) {
-
- descP->rNumCnt++;
- if (descP->rNumCnt >= descP->rNum) {
+ descP->rNumCnt++;
+ if (descP->rNumCnt >= descP->rNum) {
- descP->rNumCnt = 0;
+ descP->rNumCnt = 0;
- ESOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1);
- if (descP->readPkgMaxCnt > descP->readPkgMax)
- descP->readPkgMax = descP->readPkgMaxCnt;
- descP->readPkgMaxCnt = 0;
+ ESOCK_CNT_INC(env, descP, sockRef,
+ atom_read_pkg, &descP->readPkgCnt, 1);
+ if (descP->readPkgMaxCnt > descP->readPkgMax)
+ descP->readPkgMax = descP->readPkgMaxCnt;
+ descP->readPkgMaxCnt = 0;
- recv_update_current_reader(env, descP, sockRef);
+ recv_update_current_reader(env, descP, sockRef);
- /* This transfers "ownership" of the *allocated* binary to an
- * erlang term (no need for an explicit free).
- */
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
- return esock_make_ok3(env, atom_true, MKBIN(env, bufP));
+ return esock_make_ok3(env, atom_true, MKBIN(env, bufP));
- }
}
/* Yes, we *do* need to continue reading */
@@ -16179,13 +16294,13 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env,
/* +++ Oups - closed +++ */
- SSDBG( descP, ("SOCKET", "recv_check_fail -> closed\r\n") );
+ SSDBG( descP, ("SOCKET", "recv_check_fail econnreset -> closed\r\n") );
// This is a bit overkill (to count here), but just in case...
ESOCK_CNT_INC(env, descP, sockRef, atom_read_fails,
&descP->readFails, 1);
- res = recv_check_fail_closed(env, descP, sockRef, recvRef);
+ res = recv_check_fail_econnreset(env, descP, sockRef, recvRef);
} else if ((saveErrno == ERRNO_BLOCK) ||
(saveErrno == EAGAIN)) {
@@ -16210,19 +16325,19 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env,
-/* *** recv_check_fail_closed ***
+/* *** recv_check_fail_econnreset ***
*
* We detected that the socket was closed wile reading.
* Inform current and waiting readers.
*/
static
-ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env,
- ESockDescriptor* descP,
- ERL_NIF_TERM sockRef,
- ERL_NIF_TERM recvRef)
+ERL_NIF_TERM recv_check_fail_econnreset(ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM recvRef)
{
- ERL_NIF_TERM res = esock_make_error(env, atom_closed);
- int sres;
+ ERL_NIF_TERM reason = atom_econnreset;
+ ERL_NIF_TERM res = esock_make_error(env, atom_econnreset);
/* <KOLLA>
*
@@ -16239,16 +16354,18 @@ ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env,
* </KOLLA>
*/
+ recv_error_current_reader(env, descP, sockRef, reason);
+
+ MUNLOCK(descP->readMtx);
+
+ MLOCK(descP->closeMtx);
+ MLOCK(descP->readMtx);
+
+ descP->isReadable = FALSE;
descP->closeLocal = FALSE;
descP->state = ESOCK_STATE_CLOSING;
- recv_error_current_reader(env, descP, sockRef, res);
-
- if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) {
- esock_warning_msg("Failed stop select (closed) "
- "for current reader (%T): %d\r\n",
- recvRef, sres);
- }
+ MUNLOCK(descP->closeMtx);
return res;
}
@@ -16301,11 +16418,11 @@ ERL_NIF_TERM recv_check_fail_gen(ErlNifEnv* env,
int saveErrno,
ERL_NIF_TERM sockRef)
{
- ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
+ ERL_NIF_TERM reason = MKA(env, erl_errno_id(saveErrno));
- recv_error_current_reader(env, descP, sockRef, res);
+ recv_error_current_reader(env, descP, sockRef, reason);
- return res;
+ return esock_make_error(env, reason);
}
@@ -16449,7 +16566,7 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env,
/* The recvfrom function delivers one (1) message. If our buffer
- * is to small, the message will be truncated. So, regardless
+ * is too small, the message will be truncated. So, regardless
* if we filled the buffer or not, we have got what we are going
* to get regarding this message.
*/
@@ -18862,10 +18979,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock.w[%d]", sock);
descP->writeMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentWriter.pid);
- MON_INIT(&descP->currentWriter.mon);
- descP->currentWriter.env = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentWriter);
descP->currentWriterP = NULL; // currentWriter not used
descP->writersQ.first = NULL;
descP->writersQ.last = NULL;
@@ -18880,10 +18994,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock.r[%d]", sock);
descP->readMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentReader.pid);
- MON_INIT(&descP->currentReader.mon);
- descP->currentReader.env = NULL;
- descP->currentReader.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentReader);
descP->currentReaderP = NULL; // currentReader not used
descP->readersQ.first = NULL;
descP->readersQ.last = NULL;
@@ -18898,10 +19009,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock.acc[%d]", sock);
descP->accMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
- descP->currentAcceptor.env = NULL;
- descP->currentAcceptor.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentAcceptor);
descP->currentAcceptorP = NULL; // currentAcceptor not used
descP->acceptorsQ.first = NULL;
descP->acceptorsQ.last = NULL;
@@ -18920,7 +19028,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock.cfg[%d]", sock);
descP->cfgMtx = MCREATE(buf);
descP->rBufSz = ESOCK_RECV_BUFFER_SIZE_DEFAULT;
- descP->rNum = 0;
+ descP->rNum = ESOCK_RECV_BUFFER_COUNT_DEFAULT;
descP->rNumCnt = 0;
descP->rCtrlSz = ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT;
descP->wCtrlSz = ESOCK_SEND_CTRL_BUFFER_SIZE_DEFAULT;
@@ -19621,6 +19729,7 @@ char* esock_send_close_msg(ErlNifEnv* env,
{
ERL_NIF_TERM sockRef, msg;
ErlNifEnv* menv;
+ char* result;
if (descP->closeEnv != NULL) {
sockRef = enif_make_resource(descP->closeEnv, descP);
@@ -19632,7 +19741,9 @@ char* esock_send_close_msg(ErlNifEnv* env,
menv = NULL; // This has the effect that the message will be copied
}
- return esock_send_msg(env, pid, msg, menv);
+ result = esock_send_msg(env, pid, msg, menv);
+ descP->closeEnv = NULL;
+ return result;
}
@@ -19893,6 +20004,14 @@ int esock_select_write(ErlNifEnv* env,
}
+/* *** esock_select_stop ***
+ *
+ * WARNING: enif_select may call esock_stop directly
+ * in which case deadlock is avoided by esock_stop that checks
+ * if it got a direct call and then does not lock closeMtx.
+ *
+ * So closeMtx is supposed to be locked when this function is called.
+ */
static
int esock_select_stop(ErlNifEnv* env,
ErlNifEvent event,
@@ -20164,15 +20283,19 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q,
return TRUE;
} else {
/* Queue was empty */
- enif_set_pid_undefined(&reqP->pid);
- MON_INIT(&reqP->mon);
- reqP->env = NULL;
- reqP->ref = esock_atom_undefined; // Just in case
+ requestor_clear(reqP);
return FALSE;
}
}
+static void requestor_clear(ESockRequestor* reqP) {
+ enif_set_pid_undefined(&reqP->pid);
+ MON_INIT(&reqP->mon);
+ reqP->env = NULL;
+ reqP->ref = esock_atom_undefined;
+}
+
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
@@ -20375,7 +20498,7 @@ int esock_demonitor(const char* slogan,
res = enif_demonitor_process(env, descP, &monP->mon);
if (res == 0) {
- esock_monitor_init(monP);
+ MON_INIT(monP);
} else {
SSDBG( descP,
("SOCKET", "[%d][%T] %s: demonitor failed: %d\r\n",
@@ -20404,6 +20527,17 @@ ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env, const ESockMonitor* monP)
+static
+void esock_release_current(const char* slogan,
+ ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ESockRequestor* current)
+{
+ DEMONP(slogan, env, descP, &current->mon);
+ esock_free_env(slogan, current->env);
+ requestor_clear(current);
+}
+
#endif // if !defined(__WIN32__)
@@ -20435,12 +20569,12 @@ void esock_dtor(ErlNifEnv* env, void* obj)
#if !defined(__WIN32__)
ESockDescriptor* descP = (ESockDescriptor*) obj;
- SGDBG( ("SOCKET", "dtor -> try destroy write mutex\r\n") );
- MDESTROY(descP->writeMtx); descP->writeMtx = NULL;
-
SGDBG( ("SOCKET", "dtor -> try destroy read mutex\r\n") );
MDESTROY(descP->readMtx); descP->readMtx = NULL;
+ SGDBG( ("SOCKET", "dtor -> try destroy write mutex\r\n") );
+ MDESTROY(descP->writeMtx); descP->writeMtx = NULL;
+
SGDBG( ("SOCKET", "dtor -> try destroy accept mutex\r\n") );
MDESTROY(descP->accMtx); descP->accMtx = NULL;
@@ -20510,12 +20644,14 @@ void esock_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) );
/* +++ Lock it down +++ */
-
- MLOCK(descP->writeMtx);
+
+ /* If we are called with a direct call; we already have closeMtx
+ */
+ if (!is_direct_call) MLOCK(descP->closeMtx);
MLOCK(descP->readMtx);
MLOCK(descP->accMtx);
+ MLOCK(descP->writeMtx);
MLOCK(descP->cfgMtx);
- if (!is_direct_call) MLOCK(descP->closeMtx);
SSDBG( descP, ("SOCKET", "esock_stop -> "
"[%d, %T] all mutex(s) locked when counters:"
@@ -20668,15 +20804,18 @@ void esock_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
}
- esock_free_env("esoc_stop - meta-env", descP->meta.env);
+ if (descP->meta.env != NULL) {
+ esock_free_env("esock_stop - meta-env", descP->meta.env);
+ descP->meta.env = NULL;
+ }
SSDBG( descP, ("SOCKET", "esock_stop -> unlock all mutex(s)\r\n") );
- if (!is_direct_call) MUNLOCK(descP->closeMtx);
MUNLOCK(descP->cfgMtx);
+ MUNLOCK(descP->writeMtx);
MUNLOCK(descP->accMtx);
MUNLOCK(descP->readMtx);
- MUNLOCK(descP->writeMtx);
+ if (!is_direct_call) MUNLOCK(descP->closeMtx);
/* And finally update the registry */
esock_send_reg_del_msg(env, sockRef);
@@ -20706,7 +20845,8 @@ void esock_stop_handle_current(ErlNifEnv* env,
DEMONP("esock_stop_handle_current", env, descP, &reqP->mon);
- if (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0) {
+ if ((! enif_is_pid_undefined(&descP->closerPid)) &&
+ (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0)) {
SSDBG( descP, ("SOCKET", "esock_stop_handle_current -> "
"send abort message to current %s %T\r\n",
@@ -20715,9 +20855,10 @@ void esock_stop_handle_current(ErlNifEnv* env,
if (esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env,
atom_closed, &reqP->pid) != NULL) {
- esock_warning_msg("Failed sending abort (%T) message to "
+ esock_warning_msg("esock_stop_handle_current: "
+ "Failed sending abort (closed) message to "
"current %s %T\r\n",
- reqP->ref, role, reqP->pid);
+ role, reqP->pid);
}
reqP->env = NULL;
}
@@ -20769,9 +20910,10 @@ void inform_waiting_procs(ErlNifEnv* env,
reason,
&currentP->data.pid) != NULL) {
- esock_warning_msg("Failed sending abort (%T) message to "
+ esock_warning_msg("inform_waiting_procs: "
+ "Failed sending abort (%T) message to "
"current %s %T\r\n",
- currentP->data.ref,
+ reason,
role,
currentP->data.pid);
@@ -20825,6 +20967,8 @@ void esock_down(ErlNifEnv* env,
* we leave it to the stop callback function.
*/
+ MLOCK(descP->closeMtx);
+
SSDBG( descP,
("SOCKET", "esock_down -> controlling process exit\r\n") );
@@ -20917,6 +21061,8 @@ void esock_down(ErlNifEnv* env,
MON2T(env, mon));
}
+ MUNLOCK(descP->closeMtx);
+
} else if (COMPARE_PIDS(&descP->connPid, pid) == 0) {
/* The connPid is only set during the connection.
@@ -20940,19 +21086,19 @@ void esock_down(ErlNifEnv* env,
sockRef = enif_make_resource(env, descP);
+ MLOCK(descP->readMtx);
MLOCK(descP->accMtx);
+ MLOCK(descP->writeMtx);
+
+ if (descP->currentReaderP != NULL)
+ esock_down_reader(env, descP, sockRef, pid);
if (descP->currentAcceptorP != NULL)
esock_down_acceptor(env, descP, sockRef, pid);
- MUNLOCK(descP->accMtx);
-
- MLOCK(descP->writeMtx);
if (descP->currentWriterP != NULL)
esock_down_writer(env, descP, sockRef, pid);
- MUNLOCK(descP->writeMtx);
- MLOCK(descP->readMtx);
- if (descP->currentReaderP != NULL)
- esock_down_reader(env, descP, sockRef, pid);
+ MUNLOCK(descP->writeMtx);
+ MUNLOCK(descP->accMtx);
MUNLOCK(descP->readMtx);
}
@@ -20988,12 +21134,8 @@ void esock_down_acceptor(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "esock_down_acceptor -> no more writers\r\n") );
- descP->state = ESOCK_STATE_LISTENING;
-
- descP->currentAcceptorP = NULL;
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- esock_monitor_init(&descP->currentAcceptor.mon);
+ descP->state = ESOCK_STATE_LISTENING;
+ descP->currentAcceptorP = NULL;
}
} else {
@@ -21029,12 +21171,11 @@ void esock_down_writer(ErlNifEnv* env,
"current writer - try activate next\r\n") );
if (!activate_next_writer(env, descP, sockRef)) {
+
SSDBG( descP, ("SOCKET",
"esock_down_writer -> no active writer\r\n") );
+
descP->currentWriterP = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
}
} else {
@@ -21070,13 +21211,12 @@ void esock_down_reader(ErlNifEnv* env,
"current reader - try activate next\r\n") );
if (!activate_next_reader(env, descP, sockRef)) {
+
SSDBG( descP,
("SOCKET",
"esock_down_reader -> no more readers\r\n") );
+
descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
}
} else {
diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c
index 6506ddc1e8..a30c2e36ae 100644
--- a/erts/emulator/nifs/common/socket_util.c
+++ b/erts/emulator/nifs/common/socket_util.c
@@ -1092,50 +1092,43 @@ char* esock_decode_timeval(ErlNifEnv* env,
if (!GET_MAP_VAL(env, eTime, esock_atom_usec, &eUSec))
return ESOCK_STR_EINVAL;
- /* On some platforms (e.g. OpenBSD) this is a 'long long' and on others
- * (e.g. Linux) its a long.
- * As long as they are both 64 bits, its easy (use our own signed 64-bit int
- * and then cast). But if they are either not 64 bit, or they are of different size
- * then we make it easy on ourselves and use long and then cast to whatever
- * type sec is.
+ /* Use the appropriate variable type and nif function
+ * to decode the value from Erlang into the struct timeval fields
*/
-#if (SIZEOF_LONG_LONG == SIZEOF_LONG) && (SIZEOF_LONG == 8)
- {
+ { /* time_t tv_sec; */
+#if (SIZEOF_TIME_T == 8)
ErlNifSInt64 sec;
if (!GET_INT64(env, eSec, &sec))
return ESOCK_STR_EINVAL;
- timeP->tv_sec = (typeof(timeP->tv_sec)) sec;
- }
-#else
- {
+#elif (SIZEOF_TIME_T == SIZEOF_INT)
+ int sec;
+ if (!GET_INT(env, eSec, &sec))
+ return ESOCK_STR_EINVAL;
+#else /* long or other e.g undefined */
long sec;
if (!GET_LONG(env, eSec, &sec))
return ESOCK_STR_EINVAL;
- timeP->tv_sec = (typeof(timeP->tv_sec)) sec;
- }
#endif
+ timeP->tv_sec = sec;
+ }
- #if (SIZEOF_INT == 4)
- {
+ { /* suseconds_t tv_usec; */
+#if (SIZEOF_SUSECONDS_T == 8)
+ ErlNifSInt64 usec;
+ if (!GET_INT64(env, eSec, &usec))
+ return ESOCK_STR_EINVAL;
+#elif (SIZEOF_SUSECONDS_T == SIZEOF_INT)
int usec;
- if (!GET_INT(env, eUSec, &usec))
+ if (!GET_INT(env, eSec, &usec))
return ESOCK_STR_EINVAL;
- timeP->tv_usec = (typeof(timeP->tv_usec)) usec;
- }
-#elif (SIZEOF_LONG == 4)
- {
+#else /* long or other e.g undefined */
long usec;
- if (!GET_LONG(env, eUSec, &usec))
+ if (!GET_LONG(env, eSec, &usec))
return ESOCK_STR_EINVAL;
- timeP->tv_usec = (typeof(timeP->tv_usec)) usec;
+#endif
+ timeP->tv_usec = usec;
}
-#else
- /* Ok, we give up... */
- if (!GET_LONG(env, eUSec, &timeP->tv_usec))
- return ESOCK_STR_EINVAL;
-#endif
-
return NULL;
}
diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl
index 6d36caa5e0..bce4ae1a5f 100644
--- a/erts/emulator/test/socket_SUITE.erl
+++ b/erts/emulator/test/socket_SUITE.erl
@@ -15826,6 +15826,16 @@ api_opt_sock_timestamp_tcp(InitState) ->
ERROR
end
end},
+ %% Linux pecularity observed here...
+ %% Detected on Kernel 4.15.0-72 x96_64.
+ %% The option set to enable receiving timestamps just above
+ %% has failed to be effective down in "await recv reply 2
+ %% (from server, w timestamp)" below, unless we put the
+ %% sleep between setting the option and informing
+ %% the writer that it shall write to the other socket end.
+ %% A sleep 1 ms improves a lot but does not remove
+ %% problem completely. Believe it or not.
+ ?SEV_SLEEP(100),
#{desc => "announce ready (timestamp on)",
cmd => fun(#{tester := Tester}) ->
?SEV_ANNOUNCE_READY(Tester, timestamp_on),
@@ -15846,7 +15856,7 @@ api_opt_sock_timestamp_tcp(InitState) ->
ok
end},
#{desc => "await recv reply 2 (from server, w timestamp)",
- cmd => fun(#{sock := Sock, recv := Recv}) ->
+ cmd => fun(#{sock := Sock, recv := Recv, get := Get}) ->
case Recv(Sock) of
{ok, {[#{level := socket,
type := timestamp,
@@ -15856,6 +15866,8 @@ api_opt_sock_timestamp_tcp(InitState) ->
"~n ~p", [TS]),
ok;
{ok, {BadCMsgHdrs, ?BASIC_REP}} ->
+ ?SEV_EPRINT("Current timestamp value:"
+ " ~p", [Get(Sock)]),
{error, {unexpected_reply_cmsghdrs,
BadCMsgHdrs}};
{ok, {[#{level := socket,
@@ -16012,7 +16024,7 @@ api_opt_sock_timestamp_tcp(InitState) ->
?SEV_ANNOUNCE_CONTINUE(Server, accept),
ok
end},
- ?SEV_SLEEP(?SECS(1)),
+%%% ?SEV_SLEEP(?SECS(1)),
#{desc => "order client to continue (with connect)",
cmd => fun(#{client := Client} = _State) ->
?SEV_ANNOUNCE_CONTINUE(Client, connect),
@@ -16187,8 +16199,6 @@ api_opt_sock_timestamp_tcp(InitState) ->
ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]).
-
-
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Tests that the add_mambership and drop_membership ip options work.
@@ -27701,8 +27711,7 @@ traffic_send_and_recv_tcp(InitState) ->
local_sa := #{path := Path}}) ->
?SEV_ANNOUNCE_READY(Tester, init, Path),
ok;
- (#{lsock := LSock,
- tester := Tester,
+ (#{tester := Tester,
lport := Port}) ->
?SEV_ANNOUNCE_READY(Tester, init, Port),
ok
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 9fcb4ff712..1cffd45f05 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 2e38ede125..eb1bcdce66 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -778,6 +778,7 @@
-define(ESOCK_OPT_OTP_DOMAIN, 16#FF01). % INTERNAL
-define(ESOCK_OPT_OTP_TYPE, 16#FF02). % INTERNAL
-define(ESOCK_OPT_OTP_PROTOCOL, 16#FF03). % INTERNAL
+-define(ESOCK_OPT_OTP_DTP, 16#FF04). % INTERNAL
%% *** SOCKET (socket) options
-define(ESOCK_OPT_SOCK_ACCEPTCONN, 1).
@@ -1104,7 +1105,7 @@ supports() ->
{recv_flags, supports(recv_flags)}].
--dialyzer({nowarn_function, supports/1}).
+-dialyzer({no_contracts, supports/1}).
-spec supports(options) -> supports_options();
(sctp) -> boolean();
(ipv6) -> boolean();
@@ -1129,7 +1130,7 @@ supports(recv_flags) ->
supports(_Key1) ->
false.
--dialyzer({nowarn_function, supports/2}).
+-dialyzer({no_contracts, supports/2}).
-spec supports(options, socket) -> supports_options_socket();
(options, ip) -> supports_options_ip();
(options, ipv6) -> supports_options_ipv6();
@@ -1152,7 +1153,6 @@ supports(_Key1, _Level) ->
false.
--dialyzer({nowarn_function, supports/3}).
-spec supports(options, socket, Opt :: socket_option()) -> boolean();
(options, ip, Opt :: ip_socket_option()) -> boolean();
(options, ipv6, Opt :: ipv6_socket_option()) -> boolean();
@@ -1164,6 +1164,7 @@ supports(_Key1, _Level) ->
Key2 :: term(),
Key3 :: term().
+-dialyzer({no_contracts, supports/3}).
supports(options, Level, Opt) ->
case supports(options, Level) of
S when is_list(S) ->
@@ -1262,23 +1263,17 @@ open(Domain, Type, Protocol, Extra) when is_map(Extra) ->
EDomain = enc_domain(Domain),
EType = enc_type(Type),
EProtocol = enc_protocol(Protocol),
- case nif_open(EDomain, EType, EProtocol, Extra) of
- {ok, SockRef} ->
- Socket = #socket{ref = SockRef},
- {ok, Socket};
- {error, _} = ERROR ->
- ERROR
- end
+ nif_open(EDomain, EType, EProtocol, Extra)
end
+ of
+ {ok, SockRef} ->
+ Socket = #socket{ref = SockRef},
+ {ok, Socket};
+ {error, _} = ERROR ->
+ ERROR
catch
- throw:T ->
- T;
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
- error:Reason ->
- {error, Reason}
+ throw:ERROR ->
+ ERROR
end.
@@ -1300,31 +1295,23 @@ bind(#socket{ref = SockRef}, Addr)
when ((Addr =:= any) orelse
(Addr =:= broadcast) orelse
(Addr =:= loopback)) ->
- try which_domain(SockRef) of
- inet ->
- nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr));
- inet6 when (Addr =:= any) orelse (Addr =:= loopback) ->
- nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr));
- _ ->
- einval()
+ try
+ case which_domain(SockRef) of
+ inet ->
+ nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr));
+ inet6 when (Addr =:= any) orelse (Addr =:= loopback) ->
+ nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr));
+ Domain ->
+ invalid_domain(Domain)
+ end
catch
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
throw:ERROR ->
ERROR
end;
bind(#socket{ref = SockRef} = _Socket, Addr) when is_map(Addr) ->
try
- begin
- nif_bind(SockRef, ensure_sockaddr(Addr))
- end
+ nif_bind(SockRef, ensure_sockaddr(Addr))
catch
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
throw:ERROR ->
ERROR
end.
@@ -1353,35 +1340,28 @@ bind(#socket{ref = SockRef}, Addrs, Action)
when is_list(Addrs) andalso ((Action =:= add) orelse (Action =:= remove)) ->
try
begin
- ensure_type(SockRef, seqpacket),
- ensure_proto(SockRef, sctp),
- validate_addrs(which_domain(SockRef), Addrs),
+ {Domain, Type, Proto} = which_dtp(SockRef),
+ ensure_domain(Domain, [inet, inet6]),
+ ensure_type(Type, seqpacket),
+ ensure_protocol(Proto, sctp),
+ validate_addrs(Domain, Addrs),
nif_bind(SockRef, Addrs, Action)
end
catch
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
throw:ERROR ->
ERROR
end.
-ensure_type(SockRef, Type) ->
- case which_type(SockRef) of
- Type ->
- ok;
- _InvalidType ->
- einval()
- end.
+ensure_domain(Domain, [Domain | _]) -> ok;
+ensure_domain(Domain, [_ | Domains]) -> ensure_domain(Domain, Domains);
+ensure_domain(Domain, []) -> invalid_domain(Domain).
+
+ensure_type(Type, Type) -> ok;
+ensure_type(Type, _) -> invalid_type(Type).
+
+ensure_protocol(Proto, Proto) -> ok;
+ensure_protocol(Proto, _) -> invalid_protocol(Proto).
-ensure_proto(SockRef, Proto) ->
- case which_protocol(SockRef) of
- Proto ->
- ok;
- _InvalidProto ->
- einval()
- end.
validate_addrs(inet = _Domain, Addrs) ->
validate_inet_addrs(Addrs);
@@ -1441,38 +1421,45 @@ connect(Socket, SockAddr) ->
%% <KOLLA>
%% Is it possible to connect with family = local for the (dest) sockaddr?
%% </KOLLA>
-connect(_Socket, _SockAddr, Timeout)
- when (is_integer(Timeout) andalso (Timeout =< 0)) ->
- {error, timeout};
-connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
- when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso
- ((Timeout =:= nowait) orelse
- (Timeout =:= infinity) orelse is_integer(Timeout)) ->
- TS = timestamp(Timeout),
+connect(#socket{ref = SockRef}, SockAddr, Timeout) ->
+ try
+ do_connect(SockRef, ensure_sockaddr(SockAddr), deadline(Timeout))
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+
+do_connect(SockRef, SockAddr, Deadline) ->
case nif_connect(SockRef, ensure_sockaddr(SockAddr)) of
+
ok ->
%% Connected!
ok;
- {ok, Ref} when (Timeout =:= nowait) ->
+
+ {ok, Ref} when (Deadline =:= nowait) ->
%% Connecting, but the caller does not want to wait...
?SELECT(connect, Ref);
{ok, Ref} ->
%% Connecting...
- NewTimeout = next_timeout(TS, Timeout),
- receive
+ Timeout = timeout(Deadline),
+ receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, Ref} ->
- nif_finalize_connection(SockRef)
- after NewTimeout ->
+ nif_finalize_connection(SockRef)
+ after Timeout ->
cancel(SockRef, connect, Ref),
- {error, timeout}
- end;
- {error, _} = ERROR ->
- ERROR
+ {error, timeout}
+ end;
+
+
+ {error, _} = ERROR ->
+ ERROR
end.
+
%% ===========================================================================
%%
%% listen - listen for connections on a socket
@@ -1524,45 +1511,44 @@ accept(Socket) ->
Socket :: socket(),
Reason :: term().
-%% Do we really need this optimization?
-accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) ->
- {error, timeout};
-accept(#socket{ref = LSockRef}, Timeout)
- when is_integer(Timeout) orelse
- (Timeout =:= infinity) orelse
- (Timeout =:= nowait) ->
- do_accept(LSockRef, Timeout).
-
-do_accept(LSockRef, Timeout) ->
- TS = timestamp(Timeout),
+accept(#socket{ref = LSockRef}, Timeout) ->
+ try
+ do_accept(LSockRef, deadline(Timeout))
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+do_accept(LSockRef, Deadline) ->
AccRef = make_ref(),
case nif_accept(LSockRef, AccRef) of
+
{ok, SockRef} ->
Socket = #socket{ref = SockRef},
{ok, Socket};
- {error, eagain} when (Timeout =:= nowait) ->
+ {error, eagain} when (Deadline =:= nowait) ->
?SELECT(accept, AccRef);
-
{error, eagain} ->
%% Each call is non-blocking, but even then it takes
%% *some* time, so just to be sure, recalculate before
%% the receive.
- NewTimeout = next_timeout(TS, Timeout),
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = LSockRef}, select, AccRef} ->
- do_accept(LSockRef, next_timeout(TS, Timeout));
+ do_accept(LSockRef, Deadline);
{?ESOCK_TAG, _Socket, abort, {AccRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
+ after Timeout ->
cancel(LSockRef, accept, AccRef),
{error, timeout}
end;
+
{error, _} = ERROR ->
cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
ERROR
@@ -1587,17 +1573,19 @@ send(Socket, Data) ->
Socket :: socket(),
Data :: iodata(),
Flags :: send_flags(),
- Reason :: term()
- ; (Socket, Data, Timeout :: nowait) -> ok |
- {select, SelectInfo} |
- {ok, {RestData, SelectInfo}} |
- {error, Reason} when
+ Reason :: term();
+ (Socket, Data, Timeout :: nowait) ->
+ ok |
+ {ok, {binary(), SelectInfo}} |
+ {select, SelectInfo} |
+ {ok, {RestData, SelectInfo}} |
+ {error, Reason} when
Socket :: socket(),
Data :: iodata(),
RestData :: binary(),
SelectInfo :: select_info(),
- Reason :: term()
- ; (Socket, Data, Timeout) -> ok | {error, Reason} when
+ Reason :: term();
+ (Socket, Data, Timeout) -> ok | {error, Reason} when
Socket :: socket(),
Data :: iodata(),
Timeout :: timeout(),
@@ -1629,75 +1617,95 @@ send(Socket, Data, Flags, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
send(Socket, Bin, Flags, Timeout);
send(#socket{ref = SockRef}, Data, Flags, Timeout)
- when is_binary(Data) andalso
- is_list(Flags) andalso
- ((Timeout =:= nowait) orelse
- (Timeout =:= infinity) orelse
- (is_integer(Timeout) andalso (Timeout > 0))) ->
- EFlags = enc_send_flags(Flags),
- do_send(SockRef, Data, EFlags, Timeout).
-
-do_send(SockRef, Data, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+ when is_binary(Data), is_list(Flags) ->
+ To = undefined,
+ try
+ begin
+ EFlags = enc_send_flags(Flags),
+ Deadline = deadline(Timeout),
+ send_common(SockRef, Data, To, EFlags, Deadline, send)
+ end
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+send_common(SockRef, Data, To, EFlags, Deadline, SendName) ->
+
SendRef = make_ref(),
- case nif_send(SockRef, SendRef, Data, EFlags) of
- ok ->
- ok;
+ case
+ case SendName of
+ send ->
+ nif_send(SockRef, SendRef, Data, EFlags);
+ sendto ->
+ nif_sendto(SockRef, SendRef, Data, To, EFlags)
+ end
+ of
- {ok, Written} when (Timeout =:= nowait) ->
- <<_:Written/binary, Rest/binary>> = Data,
+ ok -> ok;
+
+
+ {ok, Written} when (Deadline =:= nowait) ->
%% We are partially done, but the user don't want to wait (here)
%% for completion
- {ok, {Rest, ?SELECT_INFO(send, SendRef)}};
-
+ <<_:Written/binary, Rest/binary>> = Data,
+ {ok, {Rest, ?SELECT_INFO(SendName, SendRef)}};
{ok, Written} ->
- NewTimeout = next_timeout(TS, Timeout),
%% We are partially done, wait for continuation
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef}
when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
- do_send(SockRef, Rest, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Rest, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Data, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
+ {error, {Reason, size(Data)}}
- after NewTimeout ->
- cancel(SockRef, send, SendRef),
+ after Timeout ->
+ _ = cancel(SockRef, SendName, SendRef),
{error, {timeout, size(Data)}}
end;
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(send, SendRef);
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called send, got eagain, and called send again
+ %% - without waiting for select message
+ erlang:error(Reason);
+
+ {error, eagain} when (Deadline =:= nowait) ->
+ ?SELECT(SendName, SendRef);
{error, eagain} ->
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout));
+ send_common(
+ SockRef, Data, To, EFlags, Deadline, SendName);
{?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
+ {error, {Reason, size(Data)}}
after Timeout ->
- cancel(SockRef, send, SendRef),
+ _ = cancel(SockRef, SendName, SendRef),
{error, {timeout, size(Data)}}
end;
- {error, _} = ERROR ->
- ERROR
- end.
-
+ {error, Reason} ->
+ {error, {Reason, size(Data)}}
+ end.
%% ---------------------------------------------------------------------------
@@ -1740,9 +1748,11 @@ sendto(Socket, Data, Dest, Timeout) ->
sendto(Socket, Data, Dest, ?ESOCK_SENDTO_FLAGS_DEFAULT, Timeout).
--spec sendto(Socket, Data, Dest, Flags, nowait) -> ok |
- {select, SelectInfo} |
- {error, Reason} when
+-spec sendto(Socket, Data, Dest, Flags, nowait) ->
+ ok |
+ {ok, {binary(), SelectInfo}} |
+ {select, SelectInfo} |
+ {error, Reason} when
Socket :: socket(),
Data :: binary(),
Dest :: sockaddr(),
@@ -1760,75 +1770,21 @@ sendto(Socket, Data, Dest, Timeout) ->
sendto(Socket, Data, Dest, Flags, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
sendto(Socket, Bin, Dest, Flags, Timeout);
-sendto(#socket{ref = SockRef}, Data, #{family := Fam} = Dest, Flags, Timeout)
- when is_binary(Data) andalso
- ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso
- is_list(Flags) andalso
- ((Timeout =:= nowait) orelse
- (Timeout =:= infinity) orelse
- (is_integer(Timeout) andalso (Timeout > 0))) ->
- EFlags = enc_send_flags(Flags),
- do_sendto(SockRef, Data, ensure_sockaddr(Dest), EFlags, Timeout).
-
-do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
- TS = timestamp(Timeout),
- SendRef = make_ref(),
- case nif_sendto(SockRef, SendRef, Data, Dest, EFlags) of
- ok ->
- %% We are done
- ok;
-
- {ok, Written} when (Timeout =:= nowait) ->
- <<_:Written/binary, Rest/binary>> = Data,
- {ok, {Rest, ?SELECT_INFO(sendto, SendRef)}};
-
-
- {ok, Written} ->
- %% We are partially done, wait for continuation
- receive
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef}
- when (Written > 0) ->
- <<_:Written/binary, Rest/binary>> = Data,
- do_sendto(SockRef, Rest, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendto(SockRef, Data, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
-
- after Timeout ->
- cancel(SockRef, sendto, SendRef),
- {error, timeout}
- end;
-
-
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(sendto, SendRef);
-
-
- {error, eagain} ->
- receive
- {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendto(SockRef, Data, Dest, EFlags,
- next_timeout(TS, Timeout));
-
- {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
- {error, Reason}
-
- after Timeout ->
- cancel(SockRef, sendto, SendRef),
- {error, timeout}
- end;
-
- {error, _} = ERROR ->
+sendto(#socket{ref = SockRef}, Data, Dest, Flags, Timeout)
+ when is_binary(Data), is_list(Flags) ->
+ try
+ begin
+ To = ensure_sockaddr(Dest),
+ EFlags = enc_send_flags(Flags),
+ Deadline = deadline(Timeout),
+ send_common(SockRef, Data, To, EFlags, Deadline, sendto)
+ end
+ catch
+ throw:ERROR ->
ERROR
end.
-
%% ---------------------------------------------------------------------------
%%
%% The only part of the msghdr() that *must* exist (a connected
@@ -1837,9 +1793,13 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
%% used when sending.
%%
--spec sendmsg(Socket, MsgHdr) -> ok | {error, Reason} when
+-spec sendmsg(Socket, MsgHdr) ->
+ ok |
+ {ok, Remaining} |
+ {error, Reason} when
Socket :: socket(),
MsgHdr :: msghdr(),
+ Remaining :: erlang:iovec(),
Reason :: term().
sendmsg(Socket, MsgHdr) ->
@@ -1851,15 +1811,16 @@ sendmsg(Socket, MsgHdr) ->
Socket :: socket(),
MsgHdr :: msghdr(),
Flags :: send_flags(),
- Reason :: term()
- ; (Socket, MsgHdr, Timeout :: nowait) -> ok |
- {select, SelectInfo} |
- {error, Reason} when
+ Reason :: term();
+ (Socket, MsgHdr, Timeout :: nowait) ->
+ ok |
+ {ok, Remaining} |
+ {error, Reason} when
Socket :: socket(),
MsgHdr :: msghdr(),
- SelectInfo :: select_info(),
- Reason :: term()
- ; (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when
+ Remaining :: erlang:iovec(),
+ Reason :: term();
+ (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when
Socket :: socket(),
MsgHdr :: msghdr(),
Timeout :: timeout(),
@@ -1867,21 +1828,18 @@ sendmsg(Socket, MsgHdr) ->
sendmsg(Socket, MsgHdr, Flags) when is_list(Flags) ->
sendmsg(Socket, MsgHdr, Flags, ?ESOCK_SENDMSG_TIMEOUT_DEFAULT);
-sendmsg(Socket, MsgHdr, Timeout)
- when is_integer(Timeout) orelse (Timeout =:= infinity) ->
+sendmsg(Socket, MsgHdr, Timeout) ->
sendmsg(Socket, MsgHdr, ?ESOCK_SENDMSG_FLAGS_DEFAULT, Timeout).
-spec sendmsg(Socket, MsgHdr, Flags, nowait) ->
ok |
{ok, Remaining} |
- {select, SelectInfo} |
{error, Reason} when
Socket :: socket(),
MsgHdr :: msghdr(),
Flags :: send_flags(),
Remaining :: erlang:iovec(),
- SelectInfo :: select_info(),
Reason :: term()
; (Socket, MsgHdr, Flags, Timeout) ->
ok |
@@ -1895,25 +1853,24 @@ sendmsg(Socket, MsgHdr, Timeout)
Reason :: term().
sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout)
- when is_list(IOV) andalso
- is_list(Flags) andalso
- ((Timeout =:= nowait) orelse
- (Timeout =:= infinity) orelse
- (is_integer(Timeout) andalso (Timeout > 0))) ->
- try ensure_msghdr(MsgHdr) of
- M ->
+ when is_list(IOV), is_list(Flags) ->
+ try
+ begin
+ M = ensure_msghdr(MsgHdr),
EFlags = enc_send_flags(Flags),
- do_sendmsg(SockRef, M, EFlags, Timeout)
+ Deadline = deadline(Timeout),
+ do_sendmsg(SockRef, M, EFlags, Deadline)
+ end
catch
- throw:T ->
- T;
- error:Reason ->
- {error, Reason}
+ throw:ERROR ->
+ ERROR
end.
-do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+
+do_sendmsg(SockRef, MsgHdr, EFlags, Deadline) ->
+
SendRef = make_ref(),
+
case nif_sendmsg(SockRef, SendRef, MsgHdr, EFlags) of
ok ->
%% We are done
@@ -1925,28 +1882,39 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
%% be able to handle a message being split. Leave it to
%% the caller to figure out (call again with the rest).
%%
- %% We should really not need to cancel, since this is
- %% accepted for sendmsg!
+ %% We need to cancel this partial write.
%%
- cancel(SockRef, sendmsg, SendRef),
+ _ = cancel(SockRef, sendmsg, SendRef),
{ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)};
- {error, eagain} when (Timeout =:= nowait) ->
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called send, got eagain, and called send again
+ %% - without waiting for select message
+ erlang:error(Reason);
+
+
+ {error, eagain} when (Deadline =:= nowait) ->
?SELECT(sendmsg, SendRef);
-
{error, eagain} ->
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} ->
- do_sendmsg(SockRef, MsgHdr, EFlags,
- next_timeout(TS, Timeout))
+ do_sendmsg(SockRef, MsgHdr, EFlags, Deadline);
+
+ {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} ->
+ {error, Reason}
after Timeout ->
- cancel(SockRef, sendmsg, SendRef),
+ _ = cancel(SockRef, sendmsg, SendRef),
{error, timeout}
end;
+
{error, _} = ERROR ->
ERROR
end.
@@ -1970,7 +1938,6 @@ ensure_msghdr(_) ->
-
%% ===========================================================================
%%
%% recv, recvfrom, recvmsg - receive a message from a socket
@@ -2062,154 +2029,134 @@ recv(Socket, Length, Timeout) ->
Reason :: term().
recv(#socket{ref = SockRef}, Length, Flags, Timeout)
- when (is_integer(Length) andalso (Length >= 0)) andalso
- is_list(Flags) andalso
- (is_integer(Timeout) orelse
- (Timeout =:= infinity) orelse
- (Timeout =:= nowait)) ->
- EFlags = enc_recv_flags(Flags),
- do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout).
-
-%% We need to pass the "old recv ref" around because of the special case
-%% with Length = 0. This case makes it neccessary to have a timeout function
-%% clause since we may never wait for anything (no receive select), and so the
-%% the only timeout check will be the function clause.
-%% Note that the Timeout value of 'nowait' has a special meaning. It means
+ when is_integer(Length), Length >= 0, is_list(Flags) ->
+ try
+ EFlags = enc_recv_flags(Flags),
+ Deadline = deadline(Timeout),
+ do_recv(SockRef, Length, EFlags, Deadline, <<>>)
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+%% We will only recurse with Length == 0 if Length is 0,
+%% so Length == 0 means to return all available data also when recursing
+%%
+%% Note that the Deadline value of 'nowait' has a special meaning. It means
%% that we will either return with data or with the with {error, NNNN}. In
%% wich case the caller will receive a select message at some later time.
-do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
- when (Timeout =:= nowait) orelse
- (Timeout =:= infinity) orelse
- (is_integer(Timeout) andalso (Timeout > 0)) ->
- TS = timestamp(Timeout),
+%%
+do_recv(SockRef, Length, EFlags, Deadline, Acc) ->
+
RecvRef = make_ref(),
case nif_recv(SockRef, RecvRef, Length, EFlags) of
- {ok, true = _Complete, Bin} when (size(Acc) =:= 0) ->
- {ok, Bin};
+
{ok, true = _Complete, Bin} ->
- {ok, <<Acc/binary, Bin/binary>>};
+ {ok, bincat(Acc, Bin)};
+
%% It depends on the amount of bytes we tried to read:
%% 0 - Read everything available
%% We got something, but there may be more - keep reading.
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
- {ok, false = _Complete, Bin} when (Length =:= 0) ->
- do_recv(SockRef, RecvRef,
- Length, EFlags,
- <<Acc/binary, Bin/binary>>,
- next_timeout(TS, Timeout));
-
+ {ok, false = _Complete, Bin} when Length =:= 0 ->
+ Timeout = timeout(Deadline),
+ if
+ 0 < Timeout ->
+ do_recv(
+ SockRef, Length, EFlags, Deadline, bincat(Acc, Bin));
+ true ->
+ {ok, bincat(Acc, Bin)}
+ end;
%% Did not get all the user asked for, but the user also
%% specified 'nowait', so deliver what we got and the
%% select info.
- {ok, false = _Completed, Bin} when (Timeout =:= nowait) andalso
- (size(Acc) =:= 0) ->
- {ok, {Bin, ?SELECT_INFO(recv, RecvRef)}};
+ {ok, false = _Completed, Bin} when Deadline =:= nowait ->
+ {ok, {bincat(Acc, Bin), ?SELECT_INFO(recv, RecvRef)}};
-
- {ok, false = _Completed, Bin} when (size(Acc) =:= 0) ->
- %% We got the first chunk of it.
- %% We will be notified (select message) when there
- %% is more to read.
- NewTimeout = next_timeout(TS, Timeout),
+ {ok, false = _Completed, Bin} ->
+ %% We got a chunk of it!
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} ->
- do_recv(SockRef, RecvRef,
- Length-size(Bin), EFlags,
- Bin,
- next_timeout(TS, Timeout));
+ if
+ 0 < Timeout ->
+ do_recv(
+ SockRef, Length - byte_size(Bin), EFlags,
+ Deadline, bincat(Acc, Bin));
+ true ->
+ {error, {timeout, bincat(Acc, Bin)}}
+ end;
{?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
+ after Timeout ->
cancel(SockRef, recv, RecvRef),
- {error, {timeout, Acc}}
+ {error, {timeout, bincat(Acc, Bin)}}
end;
- {ok, false = _Completed, Bin} ->
- %% We got a chunk of it!
- NewTimeout = next_timeout(TS, Timeout),
- receive
- {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} ->
- do_recv(SockRef, RecvRef,
- Length-size(Bin), EFlags,
- <<Acc/binary, Bin/binary>>,
- next_timeout(TS, Timeout));
- {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} ->
- {error, Reason}
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
- after NewTimeout ->
- cancel(SockRef, recv, RecvRef),
- {error, {timeout, Acc}}
- end;
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called recv, got eagain, and called recv again
+ %% - without waiting for select message
+ erlang:error(Reason);
%% The user does not want to wait!
%% The user will be informed that there is something to read
%% via the select socket message (see below).
-
- {error, eagain} when (Timeout =:= nowait) andalso (size(Acc) =:= 0) ->
- ?SELECT(recv, RecvRef);
- {error, eagain} when (Timeout =:= nowait) ->
- {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}};
+ {error, eagain} when Deadline =:= nowait ->
+ if
+ byte_size(Acc) =:= 0 ->
+ ?SELECT(recv, RecvRef);
+ true ->
+ {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}}
+ end;
%% We return with the accumulated binary (if its non-empty)
- {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) ->
- %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR?
+ {error, eagain} when Length =:= 0, 0 < byte_size(Acc) ->
+ cancel(SockRef, recv, RecvRef),
{ok, Acc};
{error, eagain} ->
%% There is nothing just now, but we will be notified when there
%% is something to read (a select message).
- NewTimeout = next_timeout(TS, Timeout),
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} ->
- do_recv(SockRef, RecvRef,
- Length, EFlags,
- Acc,
- next_timeout(TS, Timeout));
+ if
+ 0 < Timeout ->
+ do_recv(
+ SockRef, Length, EFlags, Deadline, Acc);
+ 0 < byte_size(Acc) ->
+ {error, {timeout, Acc}};
+ true ->
+ {error, timeout}
+ end;
{?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
+ after Timeout ->
cancel(SockRef, recv, RecvRef),
{error, timeout}
end;
- {error, closed = Reason} ->
- do_close(SockRef),
- if
- (size(Acc) =:= 0) ->
- {error, Reason};
- true ->
- {error, {Reason, Acc}}
- end;
- {error, _} = ERROR when (size(Acc) =:= 0) ->
+ {error, _} = ERROR when byte_size(Acc) =:= 0 ->
ERROR;
{error, Reason} ->
{error, {Reason, Acc}}
- end;
-
-do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
- %% The current recv operation is to be cancelled, so no need for a ref...
- %% The cancel will end our 'read everything you have' and "activate"
- %% any waiting reader.
- cancel(SockRef, recv, RecvRef),
- {ok, Acc};
-do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout)
- when (size(Acc) > 0) ->
- {error, {timeout, Acc}};
-do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
- {error, timeout}.
+ end.
@@ -2324,43 +2271,54 @@ recvfrom(Socket, BufSz, Timeout) ->
Reason :: term().
recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout)
- when (is_integer(BufSz) andalso (BufSz >= 0)) andalso
- is_list(Flags) andalso
- (is_integer(Timeout) orelse
- (Timeout =:= infinity) orelse
- (Timeout =:= nowait)) ->
- EFlags = enc_recv_flags(Flags),
- do_recvfrom(SockRef, BufSz, EFlags, Timeout).
-
-do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+ when is_integer(BufSz), 0 =< BufSz, is_list(Flags) ->
+ try
+ EFlags = enc_recv_flags(Flags),
+ Deadline = deadline(Timeout),
+ do_recvfrom(SockRef, BufSz, EFlags, Deadline)
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+do_recvfrom(SockRef, BufSz, EFlags, Deadline) ->
+
RecvRef = make_ref(),
case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of
+
{ok, {_Source, _NewData}} = OK ->
OK;
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(recvfrom, RecvRef);
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called recvfrom, got eagain, and called recvfrom again
+ %% - without waiting for select message
+ erlang:error(Reason);
+
+
+ {error, eagain} when Deadline =:= nowait ->
+ ?SELECT(recvfrom, RecvRef);
{error, eagain} ->
%% There is nothing just now, but we will be notified when there
%% is something to read (a select message).
- NewTimeout = next_timeout(TS, Timeout),
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} ->
- do_recvfrom(SockRef, BufSz, EFlags,
- next_timeout(TS, Timeout));
+ do_recvfrom(SockRef, BufSz, EFlags, Deadline);
{?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
+ after Timeout ->
cancel(SockRef, recvfrom, RecvRef),
{error, timeout}
end;
+
{error, _Reason} = ERROR ->
ERROR
@@ -2425,7 +2383,7 @@ recvmsg(Socket, Timeout) ->
recvmsg(Socket, Flags, Timeout) when is_list(Flags) ->
recvmsg(Socket, 0, 0, Flags, Timeout);
-recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) ->
+recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz), is_integer(CtrlSz) ->
recvmsg(Socket, BufSz, CtrlSz,
?ESOCK_RECV_FLAGS_DEFAULT, ?ESOCK_RECV_TIMEOUT_DEFAULT).
@@ -2454,47 +2412,55 @@ recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz)
Reason :: term().
recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout)
- when (is_integer(BufSz) andalso (BufSz >= 0)) andalso
- (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso
- is_list(Flags) andalso
- (is_integer(Timeout) orelse
- (Timeout =:= infinity) orelse
- (Timeout =:= nowait)) ->
- EFlags = enc_recv_flags(Flags),
- do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout).
-
-do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+ when is_integer(BufSz), 0 =< BufSz,
+ is_integer(CtrlSz), 0 =< CtrlSz,
+ is_list(Flags) ->
+ try
+ EFlags = enc_recv_flags(Flags),
+ Deadline = deadline(Timeout),
+ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline)
+ catch
+ throw:ERROR ->
+ ERROR
+ end.
+
+do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline) ->
+
RecvRef = make_ref(),
case nif_recvmsg(SockRef, RecvRef, BufSz, CtrlSz, EFlags) of
+
{ok, _MsgHdr} = OK ->
OK;
- {error, eagain} when (Timeout =:= nowait) ->
- ?SELECT(recvmsg, RecvRef);
+ {error, exbusy} = Error when Deadline =:= nowait -> Error;
+ {error, exbusy = Reason} ->
+ %% Internal error:
+ %% we called recvmsg, got eagain, and called recvmsg again
+ %% - without waiting for select message
+ erlang:error(Reason);
+
+
+ {error, eagain} when Deadline =:= nowait ->
+ ?SELECT(recvmsg, RecvRef);
{error, eagain} ->
%% There is nothing just now, but we will be notified when there
%% is something to read (a select message).
- NewTimeout = next_timeout(TS, Timeout),
+ Timeout = timeout(Deadline),
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} ->
- do_recvmsg(SockRef, BufSz, CtrlSz, EFlags,
- next_timeout(TS, Timeout));
+ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline);
{?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
- after NewTimeout ->
+ after Timeout ->
cancel(SockRef, recvmsg, RecvRef),
{error, timeout}
end;
- {error, closed} = ERROR ->
- do_close(SockRef),
- ERROR;
{error, _Reason} = ERROR ->
ERROR
@@ -2503,7 +2469,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
-
%% ===========================================================================
%%
%% close - close a file descriptor
@@ -2523,9 +2488,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
Reason :: term().
close(#socket{ref = SockRef}) ->
- do_close(SockRef).
-
-do_close(SockRef) ->
case nif_close(SockRef) of
ok ->
nif_finalize_close(SockRef);
@@ -2555,10 +2517,7 @@ do_close(SockRef) ->
shutdown(#socket{ref = SockRef}, How) ->
try
- begin
- EHow = enc_shutdown_how(How),
- nif_shutdown(SockRef, EHow)
- end
+ nif_shutdown(SockRef, enc_shutdown_how(How))
catch
throw:T ->
T;
@@ -2627,23 +2586,15 @@ shutdown(#socket{ref = SockRef}, How) ->
setopt(#socket{ref = SockRef}, Level, Key, Value) ->
try
begin
- Domain = which_domain(SockRef),
- Type = which_type(SockRef),
- Protocol = which_protocol(SockRef),
+ {Domain, Type, Proto} = which_dtp(SockRef),
{EIsEncoded, ELevel} = enc_setopt_level(Level),
- EKey = enc_setopt_key(Level, Key, Domain, Type, Protocol),
- EVal = enc_setopt_value(Level, Key, Value, Domain, Type, Protocol),
+ EKey = enc_setopt_key(Level, Key, Domain, Type, Proto),
+ EVal = enc_setopt_value(Level, Key, Value, Domain, Type, Proto),
nif_setopt(SockRef, EIsEncoded, ELevel, EKey, EVal)
end
catch
- throw:T ->
- T;
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
- error:Reason ->
- {error, Reason} % Process more?
+ throw:ERROR ->
+ ERROR
end.
@@ -2703,33 +2654,26 @@ setopt(#socket{ref = SockRef}, Level, Key, Value) ->
getopt(#socket{ref = SockRef}, Level, Key) ->
try
begin
- Domain = which_domain(SockRef),
- Type = which_type(SockRef),
- Protocol = which_protocol(SockRef),
+ {Domain, Type, Proto} = which_dtp(SockRef),
{EIsEncoded, ELevel} = enc_getopt_level(Level),
- EKey = enc_getopt_key(Level, Key, Domain, Type, Protocol),
+ EKey = enc_getopt_key(Level, Key, Domain, Type, Proto),
%% We may need to decode the value (for the same reason
%% we (may have) needed to encode the value for setopt).
case nif_getopt(SockRef, EIsEncoded, ELevel, EKey) of
ok ->
ok;
{ok, EVal} ->
- Val = dec_getopt_value(Level, Key, EVal,
- Domain, Type, Protocol),
+ Val =
+ dec_getopt_value(
+ Level, Key, EVal, Domain, Type, Proto),
{ok, Val};
- {error, _} = ERROR ->
- ERROR
+ {error, _} = E ->
+ E
end
end
catch
- throw:E:_S ->
- E;
- %% <WIN32-TEMPORARY>
- error:notsup:S ->
- erlang:raise(error, notsup, S);
- %% </WIN32-TEMPORARY>
- error:Reason:_Stack ->
- {error, Reason} % Process more?
+ throw:ERROR ->
+ ERROR
end.
@@ -2744,41 +2688,75 @@ which_domain(SockRef) ->
case nif_getopt(SockRef, true,
?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_DOMAIN) of
{ok, Domain} ->
- Domain;
+ if
+ is_atom(Domain) ->
+ Domain;
+ is_integer(Domain) ->
+ invalid_domain(Domain)
+ end;
{error, _} = ERROR ->
throw(ERROR)
end.
--spec which_type(SockRef) -> Type when
- SockRef :: reference(),
- Type :: type().
-
-which_type(SockRef) ->
- case nif_getopt(SockRef, true,
- ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_TYPE) of
- {ok, Type} ->
- Type;
- {error, _} = ERROR ->
- throw(ERROR)
- end.
-
--spec which_protocol(SockRef) -> Protocol when
- SockRef :: reference(),
- Protocol :: protocol().
-
-which_protocol(SockRef) ->
- case nif_getopt(SockRef, true,
- ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_PROTOCOL) of
- {ok, Proto} ->
- Proto;
+%%%-spec which_type(SockRef) -> Type when
+%%% SockRef :: reference(),
+%%% Type :: type().
+%%%
+%%%which_type(SockRef) ->
+%%% case nif_getopt(SockRef, true,
+%%% ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_TYPE) of
+%%% {ok, Type} ->
+%%% if
+%%% is_atom(Type) ->
+%%% Type;
+%%% is_integer(Type) ->
+%%% invalid_type(Type)
+%%% end;
+%%% {error, _} = ERROR ->
+%%% throw(ERROR)
+%%% end.
+%%%
+%%%-spec which_protocol(SockRef) -> Protocol when
+%%% SockRef :: reference(),
+%%% Protocol :: protocol().
+%%%
+%%%which_protocol(SockRef) ->
+%%% case nif_getopt(SockRef, true,
+%%% ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_PROTOCOL) of
+%%% {ok, Proto} ->
+%%% if
+%%% is_atom(Proto) ->
+%%% Proto;
+%%% is_integer(Proto) ->
+%%% invalid_protocol(Proto)
+%%% end;
+%%% {error, _} = ERROR ->
+%%% throw(ERROR)
+%%% end.
+
+which_dtp(SockRef) ->
+ case
+ nif_getopt(
+ SockRef, true, ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_DTP)
+ of
+ {ok, {Domain, Type, Proto} = DTP} ->
+ if
+ is_integer(Domain) ->
+ invalid_domain(Domain);
+ is_integer(Type) ->
+ invalid_type(Type);
+ is_integer(Proto) ->
+ invalid_protocol(Proto);
+ is_atom(Domain), is_atom(Type), is_atom(Proto) ->
+ DTP
+ end;
{error, _} = ERROR ->
throw(ERROR)
end.
-
%% ===========================================================================
%%
%% sockname - return the current address of the socket.
@@ -2953,7 +2931,6 @@ enc_setopt_key(Level, Opt, Domain, Type, Protocol) ->
%% encode the value into an more "manageable" type.
%% It also handles "aliases" (see linger).
--dialyzer({nowarn_function, enc_setopt_value/6}).
-spec enc_setopt_value(otp, otp_socket_option(),
Value, Domain, Type, Protocol) -> term() when
Value :: term(),
@@ -3421,12 +3398,6 @@ enc_getopt_key(Level, Opt, Domain, Type, Protocol) ->
%% For the most part, we simply let the value pass through, but for some
%% values we may need to do an actual decode.
%%
-%% For some reason dialyzer thinks that the only valid value for Opt to
-%% this function is otp_socket_option(). Of course without explaining
-%% how it came to that conclusion... And since I know that to be false...
-%%
-
--dialyzer({nowarn_function, dec_getopt_value/6}).
%% This string is NULL-terminated, but the general function we use
%% in the nif code does not know that. So, deal with it here.
@@ -3443,78 +3414,77 @@ dec_getopt_value(_L, _Opt, V, _D, _T, _P) ->
%% Most options are usable both for set and get, but some are
%% are only available for e.g. get.
--spec enc_sockopt_key(Level, Opt,
- Direction,
+-spec enc_sockopt_key(Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: otp,
Direction :: set | get,
Opt :: otp_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
+ Protocol :: protocol();
+ (Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: socket,
Direction :: set | get,
Opt :: socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
+ Protocol :: protocol();
+ (Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: ip,
Direction :: set | get,
Opt :: ip_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
+ Protocol :: protocol();
+ (Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: ipv6,
Direction :: set | get,
Opt :: ipv6_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
- Domain, Type, Protocol) -> non_neg_integer() when
+ Protocol :: protocol();
+ (Level, Opt, Direction,
+ Domain, Type, Protocol) -> non_neg_integer() when
Level :: tcp,
Direction :: set | get,
Opt :: tcp_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
+ Protocol :: protocol();
+ (Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: udp,
Direction :: set | get,
Opt :: udp_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
+ Protocol :: protocol();
+ (Level, Opt, Direction,
Domain, Type, Protocol) -> non_neg_integer() when
Level :: sctp,
Direction :: set | get,
Opt :: sctp_socket_option(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
- Domain, Type, Protocol) -> non_neg_integer() when
+ Protocol :: protocol();
+ (Level, Opt, Direction,
+ Domain, Type, Protocol) -> non_neg_integer() when
Level :: integer(),
Direction :: set,
Opt :: integer(),
Domain :: domain(),
Type :: type(),
- Protocol :: protocol()
- ; (Level, Direction, Opt,
- Domain, Type, Protocol) -> non_neg_integer() when
+ Protocol :: protocol();
+ (Level, Opt, Direction,
+ Domain, Type, Protocol) -> {NativeOpt, ValueSize} when
Level :: integer(),
Direction :: get,
Opt :: {NativeOpt, ValueSize},
NativeOpt :: integer(),
- ValueSize :: non_neg_integer(),
+ ValueSize :: non_neg_integer() | 'int' | 'bool',
Domain :: domain(),
Type :: type(),
Protocol :: protocol().
@@ -3934,8 +3904,8 @@ ensure_sockaddr(#{family := local, path := Path} = SockAddr)
(byte_size(Path) > 0) andalso
(byte_size(Path) =< 255) ->
SockAddr;
-ensure_sockaddr(_SockAddr) ->
- einval().
+ensure_sockaddr(SockAddr) ->
+ invalid_address(SockAddr).
@@ -3943,19 +3913,29 @@ cancel(SockRef, Op, OpRef) ->
case nif_cancel(SockRef, Op, OpRef) of
%% The select has already completed
{error, select_sent} ->
- flush_select_msgs(SockRef, OpRef);
+ flush_select_msg(SockRef, OpRef),
+ _ = flush_abort_msg(SockRef, OpRef),
+ ok;
Other ->
+ _ = flush_abort_msg(SockRef, OpRef),
Other
end.
-flush_select_msgs(SockRef, Ref) ->
+flush_select_msg(SockRef, Ref) ->
receive
{?ESOCK_TAG, #socket{ref = SockRef}, select, Ref} ->
- flush_select_msgs(SockRef, Ref)
+ ok
after 0 ->
ok
end.
+flush_abort_msg(SockRef, Ref) ->
+ receive
+ {?ESOCK_TAG, #socket{ref = SockRef}, abort, {Ref, Reason}} ->
+ Reason
+ after 0 ->
+ ok
+ end.
%% formated_timestamp() ->
%% format_timestamp(os:timestamp()).
@@ -3983,34 +3963,37 @@ flush_select_msgs(SockRef, Ref) ->
%% lists:flatten(FormatDate).
-%% A timestamp in ms
+deadline(Timeout) ->
+ case Timeout of
+ nowait -> Timeout;
+ infinity -> Timeout;
+ _ when is_integer(Timeout), 0 =< Timeout ->
+ timestamp() + Timeout;
+ _ ->
+ invalid_timeout(Timeout)
+ end.
-timestamp(nowait = T) ->
- T;
-timestamp(infinity) ->
- undefined;
-timestamp(_) ->
- timestamp().
+timeout(Deadline) ->
+ case Deadline of
+ nowait -> 0;
+ infinity -> infinity;
+ _ ->
+ Now = timestamp(),
+ if
+ Now < Deadline -> Deadline - Now;
+ true -> 0
+ end
+ end.
timestamp() ->
erlang:monotonic_time(milli_seconds).
-next_timeout(_, nowait = Timeout) ->
- Timeout;
-next_timeout(_, infinity = Timeout) ->
- Timeout;
-next_timeout(TS, Timeout) ->
- NewTimeout = Timeout - tdiff(TS, timestamp()),
- if
- (NewTimeout > 0) ->
- NewTimeout;
- true ->
- 0
- end.
-
-tdiff(T1, T2) ->
- T2 - T1.
+-compile({inline, [bincat/2]}).
+bincat(<<>>, <<_/binary>> = B) -> B;
+bincat(<<_/binary>> = A, <<>>) -> A;
+bincat(<<_/binary>> = A, <<_/binary>> = B) ->
+ <<A/binary, B/binary>>.
%% p(F) ->
@@ -4036,42 +4019,45 @@ tdiff(T1, T2) ->
-spec invalid_domain(Domain) -> no_return() when
Domain :: term().
-
invalid_domain(Domain) ->
error({invalid_domain, Domain}).
-spec invalid_type(Type) -> no_return() when
Type :: term().
-
invalid_type(Type) ->
error({invalid_type, Type}).
-spec invalid_protocol(Proto) -> no_return() when
Proto :: term().
-
invalid_protocol(Proto) ->
error({invalid_protocol, Proto}).
+-spec invalid_address(SockAddr) -> no_return() when
+ SockAddr :: term().
+invalid_address(SockAddr) ->
+ error({invalid_address, SockAddr}).
+
+-spec invalid_timeout(Timeout) -> no_return() when
+ Timeout :: term().
+invalid_timeout(Timeout) ->
+ error({invalid_timeout, Timeout}).
+
-spec not_supported(What) -> no_return() when
What :: term().
-
not_supported(What) ->
error({not_supported, What}).
-spec unknown(What) -> no_return() when
What :: term().
-
unknown(What) ->
error({unknown, What}).
-spec einval() -> no_return().
-
einval() ->
error(einval).
-spec error(Reason) -> no_return() when
Reason :: term().
-
error(Reason) ->
throw({error, Reason}).