summaryrefslogtreecommitdiff
path: root/examples/c/ex_rep/base/rep_msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/c/ex_rep/base/rep_msg.c')
-rw-r--r--examples/c/ex_rep/base/rep_msg.c66
1 files changed, 51 insertions, 15 deletions
diff --git a/examples/c/ex_rep/base/rep_msg.c b/examples/c/ex_rep/base/rep_msg.c
index 8ed45f7f..bbcaed45 100644
--- a/examples/c/ex_rep/base/rep_msg.c
+++ b/examples/c/ex_rep/base/rep_msg.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -80,6 +80,10 @@ hm_loop(args)
for (ret = 0; ret == 0;) {
if ((ret = get_next_message(fd, &rec, &control)) != 0) {
+ if (app->shared_data.app_finished) {
+ ret = 0;
+ goto netclose;
+ }
/*
* Close this connection; if it's the master call
* for an election.
@@ -183,6 +187,7 @@ hm_loop(args)
"thread join failure");
goto out;
}
+ free(ea);
ea = NULL;
}
if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
@@ -219,15 +224,21 @@ hm_loop(args)
out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
ret = t_ret;
-
+netclose:
/* Don't close the environment before any children exit. */
- if (ea != NULL && thread_join(elect_thr, &status) != 0)
- dbenv->errx(dbenv, "can't join election thread");
+ if (ea != NULL) {
+ if (thread_join(elect_thr, &status) != 0)
+ dbenv->errx(dbenv, "can't join election thread");
+ free(ea);
+ ea = NULL;
+ }
- if (site_thrs != NULL)
+ if (site_thrs != NULL) {
while (--nsites >= 0)
if (thread_join(site_thrs[nsites], &status) != 0)
dbenv->errx(dbenv, "can't join site thread");
+ free(site_thrs);
+ }
return ((void *)(uintptr_t)ret);
}
@@ -242,6 +253,7 @@ connect_thread(args)
void *args;
{
DB_ENV *dbenv;
+ APP_DATA *app;
const char *home, *progname;
hm_loop_args *ha;
connect_args *cargs;
@@ -252,18 +264,20 @@ connect_thread(args)
socket_t fd, ns;
ha = NULL;
+ i = 0;
cargs = (connect_args *)args;
dbenv = cargs->dbenv;
home = cargs->home;
progname = cargs->progname;
machtab = cargs->machtab;
port = cargs->port;
+ app = dbenv->app_private;
/*
* Loop forever, accepting connections from new machines,
* and forking off a thread to handle each.
*/
- if ((fd = listen_socket_init(progname, port)) < 0) {
+ if ((fd = listen_socket_init(progname, port, machtab)) < 0) {
ret = errno;
goto err;
}
@@ -271,7 +285,10 @@ connect_thread(args)
for (i = 0; i < MAX_THREADS; i++) {
if ((ns = listen_socket_accept(machtab,
progname, fd, &eid)) == SOCKET_CREATION_FAILURE) {
- ret = errno;
+ if (app->shared_data.app_finished)
+ ret = 0;
+ else
+ ret = errno;
goto err;
}
if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
@@ -285,7 +302,7 @@ connect_thread(args)
ha->eid = eid;
ha->tab = machtab;
ha->dbenv = dbenv;
- if ((ret = thread_create(&hm_thrs[i++], NULL,
+ if ((ret = thread_create(&hm_thrs[i], NULL,
hm_loop, (void *)ha)) != 0) {
dbenv->errx(dbenv, "can't create thread for site");
goto err;
@@ -297,12 +314,13 @@ connect_thread(args)
dbenv->errx(dbenv, "Too many threads");
ret = ENOMEM;
+err:
/* Do not return until all threads have exited. */
while (--i >= 0)
if (thread_join(hm_thrs[i], &status) != 0)
dbenv->errx(dbenv, "can't join site thread");
-err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
+ return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
}
/*
@@ -314,15 +332,15 @@ connect_all(args)
void *args;
{
DB_ENV *dbenv;
+ APP_DATA *app;
all_args *aa;
const char *home, *progname;
- hm_loop_args *ha;
int failed, i, nsites, open, ret, *success;
machtab_t *machtab;
- thread_t *hm_thr;
+ thread_t empty_id, *hm_thr;
repsite_t *sites;
+ void *status;
- ha = NULL;
aa = (all_args *)args;
dbenv = aa->dbenv;
progname = aa->progname;
@@ -330,6 +348,9 @@ connect_all(args)
machtab = aa->machtab;
nsites = aa->nsites;
sites = aa->sites;
+ status = NULL;
+ app = dbenv->app_private;
+ memset(&empty_id, 0, sizeof(thread_t));
ret = 0;
hm_thr = NULL;
@@ -353,6 +374,9 @@ connect_all(args)
if (success[i])
continue;
+ if (app->shared_data.app_finished)
+ goto err;
+
ret = connect_site(dbenv, machtab,
progname, &sites[i], &open, &hm_thr[i]);
@@ -378,7 +402,15 @@ connect_all(args)
sleep(1);
}
-err: if (success != NULL)
+err:
+ for (i = 0; i < nsites; i++) {
+ if (!success[i] ||
+ memcmp(&hm_thr[i], &empty_id, sizeof(thread_t)) == 0)
+ continue;
+ if (thread_join(hm_thr[i], &status) != 0)
+ dbenv->errx(dbenv, "can't join site thread");
+ }
+ if (success != NULL)
free(success);
if (hm_thr != NULL)
free(hm_thr);
@@ -453,12 +485,16 @@ elect_thread(args)
machtab_parm(machtab, &n, &timeout);
(void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout);
- while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0)
+ while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) {
+ if (app->shared_data.app_finished)
+ return (NULL);
sleep(2);
+ }
if (app->elected) {
app->elected = 0;
- if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0)
+ if ((ret = dbenv->rep_start(dbenv, NULL,
+ DB_REP_MASTER)) != 0 && !app->shared_data.app_finished)
dbenv->err(dbenv, ret,
"can't start as master in election thread");
}