summaryrefslogtreecommitdiff
path: root/erts
diff options
context:
space:
mode:
authorRaimo Niskanen <raimo@erlang.org>2020-01-28 18:30:19 +0100
committerRaimo Niskanen <raimo@erlang.org>2020-01-29 15:03:57 +0100
commite68899ec50ecfe87419c7737c0f6ff731b106e55 (patch)
tree6f93923af2a0bb900d7621cd985f349fe3ca59f0 /erts
parent173c6a289b60b1e4e0d66eb6f5b81eaf1e4035d0 (diff)
downloaderlang-e68899ec50ecfe87419c7737c0f6ff731b106e55.tar.gz
Cleanup requestor handling
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c156
1 files changed, 74 insertions, 82 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 13e8478ebd..cbc80ed290 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -2679,6 +2679,8 @@ ESOCK_OPERATOR_FUNCS_DEFS
static BOOLEAN_T requestor_pop(ESockRequestQueue* q,
ESockRequestor* reqP);
+static void requestor_clear(ESockRequestor* reqP);
+
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
ESockRequestQueue* q,
ErlNifPid* pid);
@@ -2703,6 +2705,10 @@ static int esock_demonitor(const char* slogan,
static void esock_monitor_init(ESockMonitor* mon);
static ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env,
const ESockMonitor* monP);
+static void esock_release_current(const char* slogan,
+ ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ESockRequestor* current);
#endif // if defined(__WIN32__)
@@ -6490,13 +6496,6 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env,
descP->state = ESOCK_STATE_LISTENING;
descP->currentAcceptorP = NULL;
- /* Do we really need this?
- * The activate_next_acceptor (actually the requestor_pop) function
- * initiates these values if there are no waiting acceptor...
- */
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
}
}
@@ -6543,10 +6542,13 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1);
- req.env = NULL;
+ esock_release_current("esock_accept_accepting_current_error",
+ env, descP, descP->currentAcceptorP);
+
reason = MKA(env, erl_errno_id(save_errno));
res = esock_make_error(env, reason);
+ req.env = NULL;
while (acceptor_pop(env, descP, &req)) {
SSDBG( descP,
("SOCKET",
@@ -6558,7 +6560,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env,
DEMONP("esock_accept_accepting_current_error -> pop'ed writer",
env, descP, &req.mon);
}
-
+ descP->currentAcceptorP = NULL;
}
return res;
@@ -6607,6 +6609,21 @@ ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env,
if ((sres = esock_select_read(env, descP->sock, descP, pid,
sockRef, accRef)) < 0) {
+
+ esock_release_current("esock_accept_busy_retry", env,
+ descP, descP->currentAcceptorP);
+ /* It is very unlikely that a next acceptor will be able
+ * to do anything succesful, but we will clean the queue
+ */
+ if (!activate_next_acceptor(env, descP, sockRef)) {
+ SSDBG( descP,
+ ("SOCKET",
+ "esock_accept_busy_retry -> no more acceptors\r\n") );
+
+ descP->state = ESOCK_STATE_LISTENING;
+ descP->currentAcceptorP = NULL;
+ }
+
reason = MKT2(env, esock_atom_select_failed, MKI(env, sres));
res = esock_make_error(env, reason);
} else {
@@ -15163,13 +15180,6 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env,
descP->state = ESOCK_STATE_LISTENING;
descP->currentAcceptorP = NULL;
- /* Do we really need this?
- * The activate_next_acceptor (actually the requestor_pop) function
- * initiates these values if there are no waiting acceptor...
- */
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
}
SSDBG( descP, ("SOCKET", "esock_cancel_accept_current -> done with result:"
@@ -15273,10 +15283,8 @@ ERL_NIF_TERM esock_cancel_send_current(ErlNifEnv* env,
if (!activate_next_writer(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET", "esock_cancel_send_current -> no more writers\r\n") );
- descP->currentWriterP = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
+
+ descP->currentWriterP = NULL;
}
SSDBG( descP, ("SOCKET", "esock_cancel_send_current -> done with result:"
@@ -15379,10 +15387,8 @@ ERL_NIF_TERM esock_cancel_recv_current(ErlNifEnv* env,
if (!activate_next_reader(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET", "esock_cancel_recv_current -> no more readers\r\n") );
- descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
+
+ descP->currentReaderP = NULL;
}
SSDBG( descP, ("SOCKET", "esock_cancel_recv_current -> done with result:"
@@ -15642,12 +15648,11 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env,
*/
if (!activate_next_writer(env, descP, sockRef)) {
- descP->currentWriterP = NULL;
- ESOCK_ASSERT(!descP->currentWriter.env);
- descP->currentWriter.env = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
+
+ SSDBG( descP,
+ ("SOCKET", "send_check_ok -> no more writers\r\n") );
+
+ descP->currentWriterP = NULL;
}
return esock_atom_ok;
@@ -15683,10 +15688,12 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
if (descP->currentWriterP != NULL) {
- DEMONP("send_check_fail -> current writer",
- env, descP, &descP->currentWriter.mon);
+ esock_release_current("send_check_fail",
+ env, descP, descP->currentWriterP);
send_error_waiting_writers(env, descP, sockRef, reason);
+
+ descP->currentWriterP = NULL;
}
}
return esock_make_error(env, reason);
@@ -15714,7 +15721,7 @@ void send_error_waiting_writers(ErlNifEnv* env,
esock_send_abort_msg(env, sockRef, req.ref, req.env,
reason, &req.pid);
req.env = NULL;
- DEMONP("send_error_current_writer -> pop'ed writer",
+ DEMONP("send_error_waiting_writers -> pop'ed writer",
env, descP, &req.mon);
}
}
@@ -15943,10 +15950,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
("SOCKET",
"recv_update_current_reader -> no more readers\r\n") );
- descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
+ descP->currentReaderP = NULL;
}
}
@@ -15972,8 +15976,8 @@ void recv_error_current_reader(ErlNifEnv* env,
if (descP->currentReaderP != NULL) {
ESockRequestor req;
- DEMONP("recv_error_current_reader -> current reader",
- env, descP, &descP->currentReader.mon);
+ esock_release_current("recv_error_current_reader",
+ env, descP, descP->currentReaderP);
req.env = NULL; /* read by reader_pop before free */
while (reader_pop(env, descP, &req)) {
@@ -15986,6 +15990,8 @@ void recv_error_current_reader(ErlNifEnv* env,
DEMONP("recv_error_current_reader -> pop'ed reader",
env, descP, &req.mon);
}
+
+ descP->currentReaderP = NULL;
}
}
@@ -16345,30 +16351,16 @@ ERL_NIF_TERM recv_check_fail_econnreset(ErlNifEnv* env,
* </KOLLA>
*/
- MLOCK(descP->writeMtx);
MLOCK(descP->closeMtx);
descP->closeLocal = FALSE;
descP->state = ESOCK_STATE_CLOSING;
descP->isReadable = FALSE;
- descP->isWritable = FALSE;
MUNLOCK(descP->closeMtx);
recv_error_current_reader(env, descP, sockRef, reason);
- if (descP->currentWriterP != NULL) {
- ESockRequestor *reqP = descP->currentWriterP;
- esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env,
- reason, &reqP->pid);
- reqP->env = NULL;
- DEMONP("recv_check_fail_econnreset -> current writer",
- env, descP, &reqP->mon);
- descP->currentWriterP = NULL;
- send_error_waiting_writers(env, descP, sockRef, reason);
- }
-
- MUNLOCK(descP->writeMtx);
return res;
}
@@ -18981,10 +18973,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock[w,%d]", sock);
descP->writeMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentWriter.pid);
- MON_INIT(&descP->currentWriter.mon);
- descP->currentWriter.env = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentWriter);
descP->currentWriterP = NULL; // currentWriter not used
descP->writersQ.first = NULL;
descP->writersQ.last = NULL;
@@ -18999,10 +18988,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock[r,%d]", sock);
descP->readMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentReader.pid);
- MON_INIT(&descP->currentReader.mon);
- descP->currentReader.env = NULL;
- descP->currentReader.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentReader);
descP->currentReaderP = NULL; // currentReader not used
descP->readersQ.first = NULL;
descP->readersQ.last = NULL;
@@ -19017,10 +19003,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock[acc,%d]", sock);
descP->accMtx = MCREATE(buf);
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- MON_INIT(&descP->currentAcceptor.mon);
- descP->currentAcceptor.env = NULL;
- descP->currentAcceptor.ref = esock_atom_undefined;
+ requestor_clear(&descP->currentAcceptor);
descP->currentAcceptorP = NULL; // currentAcceptor not used
descP->acceptorsQ.first = NULL;
descP->acceptorsQ.last = NULL;
@@ -20294,15 +20277,19 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q,
return TRUE;
} else {
/* Queue was empty */
- enif_set_pid_undefined(&reqP->pid);
- MON_INIT(&reqP->mon);
- reqP->env = NULL;
- reqP->ref = esock_atom_undefined; // Just in case
+ requestor_clear(reqP);
return FALSE;
}
}
+static void requestor_clear(ESockRequestor* reqP) {
+ enif_set_pid_undefined(&reqP->pid);
+ MON_INIT(&reqP->mon);
+ reqP->env = NULL;
+ reqP->ref = esock_atom_undefined;
+}
+
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
@@ -20505,7 +20492,7 @@ int esock_demonitor(const char* slogan,
res = enif_demonitor_process(env, descP, &monP->mon);
if (res == 0) {
- esock_monitor_init(monP);
+ MON_INIT(monP);
} else {
SSDBG( descP,
("SOCKET", "[%d][%T] %s: demonitor failed: %d\r\n",
@@ -20534,6 +20521,17 @@ ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env, const ESockMonitor* monP)
+static
+void esock_release_current(const char* slogan,
+ ErlNifEnv* env,
+ ESockDescriptor* descP,
+ ESockRequestor* current)
+{
+ DEMONP(slogan, env, descP, &current->mon);
+ esock_free_env(slogan, current->env);
+ requestor_clear(current);
+}
+
#endif // if !defined(__WIN32__)
@@ -21130,12 +21128,8 @@ void esock_down_acceptor(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "esock_down_acceptor -> no more writers\r\n") );
- descP->state = ESOCK_STATE_LISTENING;
-
- descP->currentAcceptorP = NULL;
- descP->currentAcceptor.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentAcceptor.pid);
- esock_monitor_init(&descP->currentAcceptor.mon);
+ descP->state = ESOCK_STATE_LISTENING;
+ descP->currentAcceptorP = NULL;
}
} else {
@@ -21171,12 +21165,11 @@ void esock_down_writer(ErlNifEnv* env,
"current writer - try activate next\r\n") );
if (!activate_next_writer(env, descP, sockRef)) {
+
SSDBG( descP, ("SOCKET",
"esock_down_writer -> no active writer\r\n") );
+
descP->currentWriterP = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
}
} else {
@@ -21212,13 +21205,12 @@ void esock_down_reader(ErlNifEnv* env,
"current reader - try activate next\r\n") );
if (!activate_next_reader(env, descP, sockRef)) {
+
SSDBG( descP,
("SOCKET",
"esock_down_reader -> no more readers\r\n") );
+
descP->currentReaderP = NULL;
- descP->currentReader.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentReader.pid);
- esock_monitor_init(&descP->currentReader.mon);
}
} else {