summaryrefslogtreecommitdiff
path: root/examples/c/ex_rep
diff options
context:
space:
mode:
Diffstat (limited to 'examples/c/ex_rep')
-rw-r--r--examples/c/ex_rep/base/rep_base.c4
-rw-r--r--examples/c/ex_rep/base/rep_base.h7
-rw-r--r--examples/c/ex_rep/base/rep_msg.c66
-rw-r--r--examples/c/ex_rep/base/rep_net.c52
-rw-r--r--examples/c/ex_rep/common/rep_common.c76
-rw-r--r--examples/c/ex_rep/common/rep_common.h30
-rw-r--r--examples/c/ex_rep/mgr/rep_mgr.c19
7 files changed, 228 insertions, 26 deletions
diff --git a/examples/c/ex_rep/base/rep_base.c b/examples/c/ex_rep/base/rep_base.c
index c9443e4d..56662220 100644
--- a/examples/c/ex_rep/base/rep_base.c
+++ b/examples/c/ex_rep/base/rep_base.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -169,6 +169,8 @@ main(argc, argv)
goto err;
}
+ machtab_destroy(machtab);
+
/* Finish checkpoint and log archive threads. */
if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0)
goto err;
diff --git a/examples/c/ex_rep/base/rep_base.h b/examples/c/ex_rep/base/rep_base.h
index 27901349..48d5d3ac 100644
--- a/examples/c/ex_rep/base/rep_base.h
+++ b/examples/c/ex_rep/base/rep_base.h
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -71,6 +71,8 @@ typedef SOCKET socket_t;
#define readsocket(s, buf, sz) recv((s), (buf), (int)(sz), 0)
#define writesocket(s, buf, sz) send((s), (const char *)(buf), (int)(sz), 0)
#define net_errno WSAGetLastError()
+#undef SHUT_RDWR
+#define SHUT_RDWR SD_BOTH
#else /* !_WIN32 */
@@ -104,9 +106,10 @@ socket_t get_accepted_socket __P((const char *, int));
socket_t get_connected_socket
__P((machtab_t *, const char *, const char *, int, int *, int *));
int get_next_message __P((socket_t, DBT *, DBT *));
-socket_t listen_socket_init __P((const char *, int));
+socket_t listen_socket_init __P((const char *, int, machtab_t *));
socket_t listen_socket_accept
__P((machtab_t *, const char *, socket_t, int *));
+int machtab_destroy __P((machtab_t *));
int machtab_getinfo __P((machtab_t *, int, u_int32_t *, int *));
int machtab_init __P((machtab_t **, int));
void machtab_parm __P((machtab_t *, int *, u_int32_t *));
diff --git a/examples/c/ex_rep/base/rep_msg.c b/examples/c/ex_rep/base/rep_msg.c
index 8ed45f7f..bbcaed45 100644
--- a/examples/c/ex_rep/base/rep_msg.c
+++ b/examples/c/ex_rep/base/rep_msg.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -80,6 +80,10 @@ hm_loop(args)
for (ret = 0; ret == 0;) {
if ((ret = get_next_message(fd, &rec, &control)) != 0) {
+ if (app->shared_data.app_finished) {
+ ret = 0;
+ goto netclose;
+ }
/*
* Close this connection; if it's the master call
* for an election.
@@ -183,6 +187,7 @@ hm_loop(args)
"thread join failure");
goto out;
}
+ free(ea);
ea = NULL;
}
if ((ea = calloc(sizeof(elect_args), 1)) == NULL) {
@@ -219,15 +224,21 @@ hm_loop(args)
out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0)
ret = t_ret;
-
+netclose:
/* Don't close the environment before any children exit. */
- if (ea != NULL && thread_join(elect_thr, &status) != 0)
- dbenv->errx(dbenv, "can't join election thread");
+ if (ea != NULL) {
+ if (thread_join(elect_thr, &status) != 0)
+ dbenv->errx(dbenv, "can't join election thread");
+ free(ea);
+ ea = NULL;
+ }
- if (site_thrs != NULL)
+ if (site_thrs != NULL) {
while (--nsites >= 0)
if (thread_join(site_thrs[nsites], &status) != 0)
dbenv->errx(dbenv, "can't join site thread");
+ free(site_thrs);
+ }
return ((void *)(uintptr_t)ret);
}
@@ -242,6 +253,7 @@ connect_thread(args)
void *args;
{
DB_ENV *dbenv;
+ APP_DATA *app;
const char *home, *progname;
hm_loop_args *ha;
connect_args *cargs;
@@ -252,18 +264,20 @@ connect_thread(args)
socket_t fd, ns;
ha = NULL;
+ i = 0;
cargs = (connect_args *)args;
dbenv = cargs->dbenv;
home = cargs->home;
progname = cargs->progname;
machtab = cargs->machtab;
port = cargs->port;
+ app = dbenv->app_private;
/*
* Loop forever, accepting connections from new machines,
* and forking off a thread to handle each.
*/
- if ((fd = listen_socket_init(progname, port)) < 0) {
+ if ((fd = listen_socket_init(progname, port, machtab)) < 0) {
ret = errno;
goto err;
}
@@ -271,7 +285,10 @@ connect_thread(args)
for (i = 0; i < MAX_THREADS; i++) {
if ((ns = listen_socket_accept(machtab,
progname, fd, &eid)) == SOCKET_CREATION_FAILURE) {
- ret = errno;
+ if (app->shared_data.app_finished)
+ ret = 0;
+ else
+ ret = errno;
goto err;
}
if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) {
@@ -285,7 +302,7 @@ connect_thread(args)
ha->eid = eid;
ha->tab = machtab;
ha->dbenv = dbenv;
- if ((ret = thread_create(&hm_thrs[i++], NULL,
+ if ((ret = thread_create(&hm_thrs[i], NULL,
hm_loop, (void *)ha)) != 0) {
dbenv->errx(dbenv, "can't create thread for site");
goto err;
@@ -297,12 +314,13 @@ connect_thread(args)
dbenv->errx(dbenv, "Too many threads");
ret = ENOMEM;
+err:
/* Do not return until all threads have exited. */
while (--i >= 0)
if (thread_join(hm_thrs[i], &status) != 0)
dbenv->errx(dbenv, "can't join site thread");
-err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
+ return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE);
}
/*
@@ -314,15 +332,15 @@ connect_all(args)
void *args;
{
DB_ENV *dbenv;
+ APP_DATA *app;
all_args *aa;
const char *home, *progname;
- hm_loop_args *ha;
int failed, i, nsites, open, ret, *success;
machtab_t *machtab;
- thread_t *hm_thr;
+ thread_t empty_id, *hm_thr;
repsite_t *sites;
+ void *status;
- ha = NULL;
aa = (all_args *)args;
dbenv = aa->dbenv;
progname = aa->progname;
@@ -330,6 +348,9 @@ connect_all(args)
machtab = aa->machtab;
nsites = aa->nsites;
sites = aa->sites;
+ status = NULL;
+ app = dbenv->app_private;
+ memset(&empty_id, 0, sizeof(thread_t));
ret = 0;
hm_thr = NULL;
@@ -353,6 +374,9 @@ connect_all(args)
if (success[i])
continue;
+ if (app->shared_data.app_finished)
+ goto err;
+
ret = connect_site(dbenv, machtab,
progname, &sites[i], &open, &hm_thr[i]);
@@ -378,7 +402,15 @@ connect_all(args)
sleep(1);
}
-err: if (success != NULL)
+err:
+ for (i = 0; i < nsites; i++) {
+ if (!success[i] ||
+ memcmp(&hm_thr[i], &empty_id, sizeof(thread_t)) == 0)
+ continue;
+ if (thread_join(hm_thr[i], &status) != 0)
+ dbenv->errx(dbenv, "can't join site thread");
+ }
+ if (success != NULL)
free(success);
if (hm_thr != NULL)
free(hm_thr);
@@ -453,12 +485,16 @@ elect_thread(args)
machtab_parm(machtab, &n, &timeout);
(void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout);
- while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0)
+ while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) {
+ if (app->shared_data.app_finished)
+ return (NULL);
sleep(2);
+ }
if (app->elected) {
app->elected = 0;
- if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0)
+ if ((ret = dbenv->rep_start(dbenv, NULL,
+ DB_REP_MASTER)) != 0 && !app->shared_data.app_finished)
dbenv->err(dbenv, ret,
"can't start as master in election thread");
}
diff --git a/examples/c/ex_rep/base/rep_net.c b/examples/c/ex_rep/base/rep_net.c
index 29012536..e57e3af8 100644
--- a/examples/c/ex_rep/base/rep_net.c
+++ b/examples/c/ex_rep/base/rep_net.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -70,6 +70,7 @@ struct __machtab {
int current;
int max;
int nsites;
+ socket_t listenfd;
};
/* Data structure that describes each entry in the machtab. */
@@ -113,6 +114,7 @@ machtab_init(machtabp, nsites)
machtab->timeout_time = 2 * 1000000; /* 2 seconds. */
machtab->current = machtab->max = 0;
machtab->nsites = nsites;
+ machtab->listenfd = SOCKET_CREATION_FAILURE;
ret = mutex_init(&machtab->mtmutex, NULL);
*machtabp = machtab;
@@ -265,6 +267,47 @@ machtab_rem(machtab, eid, lock)
return (ret);
}
+/*
+ * machtab_destroy --
+ * Close the listening socket and all connecting sockets,
+ * So that all threads blocked on 'accept' and 'read' will be
+ * unblocked.
+ */
+int machtab_destroy(machtab)
+ machtab_t *machtab;
+{
+ int ret;
+ socket_t listenfd;
+ member_t *member;
+
+ if ((ret = mutex_lock(&machtab->mtmutex)) != 0) {
+ fprintf(stderr, "can't lock mutex\n");
+ return (ret);
+ }
+
+ listenfd = machtab->listenfd;
+ machtab->listenfd = SOCKET_CREATION_FAILURE;
+ for (member = LIST_FIRST(&machtab->machlist);
+ member != NULL;
+ member = LIST_FIRST(&machtab->machlist)) {
+ LIST_REMOVE(member, links);
+ shutdown(member->fd, SHUT_RDWR);
+ (void)closesocket(member->fd);
+ free(member);
+ machtab->current--;
+ }
+ shutdown(listenfd, SHUT_RDWR);
+ (void)closesocket(listenfd);
+ machtab->nextid = 2;
+ machtab->current = machtab->max = 0;
+
+ if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) {
+ fprintf(stderr, "can't unlock mutex\n");
+ return (ret);
+ }
+ return (ret);
+}
+
void
machtab_parm(machtab, nump, timeoutp)
machtab_t *machtab;
@@ -310,9 +353,10 @@ machtab_print(machtab)
* in a thread that we're happy to let block.
*/
socket_t
-listen_socket_init(progname, port)
+listen_socket_init(progname, port, machtab)
const char *progname;
int port;
+ machtab_t *machtab;
{
socket_t s;
int sockopt;
@@ -349,6 +393,7 @@ listen_socket_init(progname, port)
goto err;
}
+ machtab->listenfd = s;
return (s);
err: closesocket(s);
@@ -380,7 +425,8 @@ accept_wait:
si_len = sizeof(si);
ns = accept(s, (struct sockaddr *)&si, &si_len);
if (ns == SOCKET_CREATION_FAILURE) {
- fprintf(stderr, "can't accept incoming connection\n");
+ if (machtab->listenfd != SOCKET_CREATION_FAILURE)
+ fprintf(stderr, "can't accept incoming connection\n");
return ns;
}
host = ntohl(si.sin_addr.s_addr);
diff --git a/examples/c/ex_rep/common/rep_common.c b/examples/c/ex_rep/common/rep_common.c
index a091adda..d7d2585a 100644
--- a/examples/c/ex_rep/common/rep_common.c
+++ b/examples/c/ex_rep/common/rep_common.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -19,6 +19,18 @@
#define DATABASE "quote.db"
#define SLEEPTIME 3
+/*
+ * Definition of thread-specific data key for PERM_FAILED structure
+ * stored in thread local storage.
+ */
+#ifdef _WIN32
+/* Windows style. */
+DWORD permfail_key;
+#else
+/* Posix style. */
+pthread_key_t permfail_key;
+#endif
+
static int print_stocks __P((DB *));
/*
@@ -346,15 +358,27 @@ doloop(dbenv, shared_data)
{
DB *dbp;
DBT key, data;
+ permfail_t *pfinfo;
char buf[BUFSIZE], *first, *price;
u_int32_t flags;
int ret;
dbp = NULL;
+ pfinfo = NULL;
ret = 0;
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
+ /* Allocate put/commit thread's PERM_FAILED structure. */
+ if (shared_data->is_repmgr) {
+ if ((pfinfo = malloc(sizeof(permfail_t))) == NULL)
+ goto err;
+ if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0)
+ goto err;
+ pfinfo->thread_name = "PutCommit";
+ pfinfo->flag = 0;
+ }
+
for (;;) {
printf("QUOTESERVER%s> ",
shared_data->is_master ? "" : " (read-only)");
@@ -431,6 +455,16 @@ doloop(dbenv, shared_data)
dbenv->err(dbenv, ret, "DB->open");
goto err;
}
+ /* Check this thread's PERM_FAILED indicator. */
+ if (shared_data->is_repmgr) {
+ pfinfo = (permfail_t *)thread_getspecific(
+ permfail_key);
+ if (pfinfo->flag)
+ printf(
+ "%s Thread: dbopen not durable.\n",
+ pfinfo->thread_name);
+ pfinfo->flag = 0;
+ }
}
if (first == NULL) {
@@ -470,11 +504,23 @@ doloop(dbenv, shared_data)
dbp->err(dbp, ret, "DB->put");
goto err;
}
+ /* Check this thread's PERM_FAILED indicator. */
+ if (shared_data->is_repmgr) {
+ pfinfo = (permfail_t *)thread_getspecific(
+ permfail_key);
+ if (pfinfo->flag)
+ printf(
+ "%s Thread: put %s %s not durable.\n",
+ pfinfo->thread_name, first, price);
+ pfinfo->flag = 0;
+ }
}
}
err: if (dbp != NULL)
(void)dbp->close(dbp, DB_NOSYNC);
+ if (pfinfo != NULL)
+ free(pfinfo);
return (ret);
}
@@ -575,12 +621,24 @@ checkpoint_thread(args)
{
DB_ENV *dbenv;
SHARED_DATA *shared;
+ permfail_t *pfinfo;
supthr_args *ca;
int i, ret;
ca = (supthr_args *)args;
dbenv = ca->dbenv;
shared = ca->shared;
+ pfinfo = NULL;
+
+ /* Allocate checkpoint thread's PERM_FAILED structure. */
+ if (shared->is_repmgr) {
+ if ((pfinfo = malloc(sizeof(permfail_t))) == NULL)
+ return ((void *)EXIT_FAILURE);
+ if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0)
+ return ((void *)EXIT_FAILURE);
+ pfinfo->thread_name = "Checkpoint";
+ pfinfo->flag = 0;
+ }
for (;;) {
/*
@@ -590,16 +648,30 @@ checkpoint_thread(args)
*/
for (i = 0; i < 60; i++) {
sleep(1);
- if (shared->app_finished == 1)
+ if (shared->app_finished == 1) {
+ if (pfinfo != NULL)
+ free(pfinfo);
return ((void *)EXIT_SUCCESS);
+ }
}
/* Perform a checkpoint. */
if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) {
dbenv->err(dbenv, ret,
"Could not perform checkpoint.\n");
+ if (pfinfo != NULL)
+ free(pfinfo);
return ((void *)EXIT_FAILURE);
}
+ /* Check this thread's PERM_FAILED indicator. */
+ if (shared->is_repmgr) {
+ pfinfo = (permfail_t *)thread_getspecific(
+ permfail_key);
+ if (pfinfo->flag)
+ printf("%s Thread: checkpoint not durable.\n",
+ pfinfo->thread_name);
+ pfinfo->flag = 0;
+ }
}
}
diff --git a/examples/c/ex_rep/common/rep_common.h b/examples/c/ex_rep/common/rep_common.h
index edf3b67d..8f7d1348 100644
--- a/examples/c/ex_rep/common/rep_common.h
+++ b/examples/c/ex_rep/common/rep_common.h
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -33,6 +33,7 @@ typedef struct {
int is_master;
int app_finished;
int in_client_sync;
+ int is_repmgr;
} SHARED_DATA;
/* Arguments for support threads. */
@@ -41,6 +42,15 @@ typedef struct {
SHARED_DATA *shared;
} supthr_args;
+/*
+ * Per-thread Replication Manager structure to associate a PERM_FAILED event
+ * with its originating transaction.
+ */
+typedef struct {
+ char *thread_name;
+ int flag;
+} permfail_t;
+
/* Portability macros for basic threading & timing */
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
@@ -59,6 +69,16 @@ typedef HANDLE thread_t;
((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
+/* Thread-specific data key for PERM_FAILED structure for Windows. */
+extern DWORD permfail_key;
+/* Thread local storage routine definitions for Windows. */
+#define thread_key_create(keyp) ((*keyp = TlsAlloc()) == \
+ TLS_OUT_OF_INDEXES ? (int)GetLastError() : 0)
+#define thread_key_delete(key) (TlsFree(key) ? 0 : (int)GetLastError())
+#define thread_setspecific(key, value) (TlsSetValue(key, value) ? 0 : \
+ (int)GetLastError())
+#define thread_getspecific(key) TlsGetValue(key)
+
#else /* !_WIN32 */
#include <sys/time.h>
#include <pthread.h>
@@ -68,6 +88,14 @@ typedef pthread_t thread_t;
pthread_create((thrp), (attr), (func), (arg))
#define thread_join(thr, statusp) pthread_join((thr), (statusp))
+/* Thread-specific data key for PERM_FAILED structure for Posix. */
+extern pthread_key_t permfail_key;
+/* Thread local storage routine definitions for Posix. */
+#define thread_key_create(keyp) pthread_key_create(keyp, NULL)
+#define thread_key_delete(key) pthread_key_delete(key)
+#define thread_setspecific(key, value) pthread_setspecific(key, value)
+#define thread_getspecific(key) pthread_getspecific(key)
+
#endif
void *checkpoint_thread __P((void *));
diff --git a/examples/c/ex_rep/mgr/rep_mgr.c b/examples/c/ex_rep/mgr/rep_mgr.c
index 0eaf1971..5c6d15f5 100644
--- a/examples/c/ex_rep/mgr/rep_mgr.c
+++ b/examples/c/ex_rep/mgr/rep_mgr.c
@@ -1,7 +1,7 @@
/*-
* See the file LICENSE for redistribution information.
*
- * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved.
*
* $Id$
*/
@@ -115,6 +115,10 @@ main(argc, argv)
dbenv->err(dbenv, ret,
"Could not set heartbeat monitor timeout.\n");
+ /* Create thread-specific data key for PERM_FAILED structure. */
+ if ((ret = thread_key_create(&permfail_key)) != 0)
+ goto err;
+
/*
* The following repmgr features may also be useful to your
* application. See Berkeley DB documentation for more details.
@@ -133,6 +137,7 @@ main(argc, argv)
/* Start checkpoint and log archive threads. */
sup_args.dbenv = dbenv;
sup_args.shared = &my_app_data.shared_data;
+ my_app_data.shared_data.is_repmgr = 1;
if ((ret = start_support_threads(dbenv, &sup_args, &ckp_thr,
&lga_thr)) != 0)
goto err;
@@ -163,6 +168,10 @@ main(argc, argv)
goto err;
}
+ /* Delete thread-specific data key for PERM_FAILED structure. */
+ if ((ret = thread_key_delete(permfail_key)) != 0)
+ goto err;
+
err:
if (dbenv != NULL &&
(t_ret = dbenv->close(dbenv, 0)) != 0) {
@@ -183,8 +192,10 @@ event_callback(dbenv, which, info)
{
APP_DATA *app = dbenv->app_private;
SHARED_DATA *shared = &app->shared_data;
+ permfail_t *pfinfo;
int err;
+ pfinfo = NULL;
switch (which) {
case DB_EVENT_PANIC:
@@ -213,8 +224,12 @@ event_callback(dbenv, which, info)
* transaction will be flushed to the master site's
* local disk storage for durability.
*/
+ /* Set this thread's PERM_FAILED indicator. */
+ pfinfo = (permfail_t *)thread_getspecific(permfail_key);
+ printf("%s Thread: ", pfinfo->thread_name);
+ pfinfo->flag = 1;
printf(
- "Insufficient acknowledgements to guarantee transaction durability.\n");
+ "Insufficient acknowledgements for transaction durability.\n");
break;
case DB_EVENT_REP_STARTUPDONE: