diff options
| author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2015-02-17 17:25:57 +0000 |
|---|---|---|
| committer | <> | 2015-03-17 16:26:24 +0000 |
| commit | 780b92ada9afcf1d58085a83a0b9e6bc982203d1 (patch) | |
| tree | 598f8b9fa431b228d29897e798de4ac0c1d3d970 /examples/c/ex_rep | |
| parent | 7a2660ba9cc2dc03a69ddfcfd95369395cc87444 (diff) | |
| download | berkeleydb-master.tar.gz | |
Diffstat (limited to 'examples/c/ex_rep')
| -rw-r--r-- | examples/c/ex_rep/base/rep_base.c | 4 | ||||
| -rw-r--r-- | examples/c/ex_rep/base/rep_base.h | 7 | ||||
| -rw-r--r-- | examples/c/ex_rep/base/rep_msg.c | 66 | ||||
| -rw-r--r-- | examples/c/ex_rep/base/rep_net.c | 52 | ||||
| -rw-r--r-- | examples/c/ex_rep/common/rep_common.c | 76 | ||||
| -rw-r--r-- | examples/c/ex_rep/common/rep_common.h | 30 | ||||
| -rw-r--r-- | examples/c/ex_rep/mgr/rep_mgr.c | 19 |
7 files changed, 228 insertions, 26 deletions
diff --git a/examples/c/ex_rep/base/rep_base.c b/examples/c/ex_rep/base/rep_base.c index c9443e4d..56662220 100644 --- a/examples/c/ex_rep/base/rep_base.c +++ b/examples/c/ex_rep/base/rep_base.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -169,6 +169,8 @@ main(argc, argv) goto err; } + machtab_destroy(machtab); + /* Finish checkpoint and log archive threads. */ if ((ret = finish_support_threads(&ckp_thr, &lga_thr)) != 0) goto err; diff --git a/examples/c/ex_rep/base/rep_base.h b/examples/c/ex_rep/base/rep_base.h index 27901349..48d5d3ac 100644 --- a/examples/c/ex_rep/base/rep_base.h +++ b/examples/c/ex_rep/base/rep_base.h @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -71,6 +71,8 @@ typedef SOCKET socket_t; #define readsocket(s, buf, sz) recv((s), (buf), (int)(sz), 0) #define writesocket(s, buf, sz) send((s), (const char *)(buf), (int)(sz), 0) #define net_errno WSAGetLastError() +#undef SHUT_RDWR +#define SHUT_RDWR SD_BOTH #else /* !_WIN32 */ @@ -104,9 +106,10 @@ socket_t get_accepted_socket __P((const char *, int)); socket_t get_connected_socket __P((machtab_t *, const char *, const char *, int, int *, int *)); int get_next_message __P((socket_t, DBT *, DBT *)); -socket_t listen_socket_init __P((const char *, int)); +socket_t listen_socket_init __P((const char *, int, machtab_t *)); socket_t listen_socket_accept __P((machtab_t *, const char *, socket_t, int *)); +int machtab_destroy __P((machtab_t *)); int machtab_getinfo __P((machtab_t *, int, u_int32_t *, int *)); int machtab_init __P((machtab_t **, int)); void machtab_parm __P((machtab_t *, int *, u_int32_t *)); diff --git a/examples/c/ex_rep/base/rep_msg.c b/examples/c/ex_rep/base/rep_msg.c index 8ed45f7f..bbcaed45 100644 --- a/examples/c/ex_rep/base/rep_msg.c +++ b/examples/c/ex_rep/base/rep_msg.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -80,6 +80,10 @@ hm_loop(args) for (ret = 0; ret == 0;) { if ((ret = get_next_message(fd, &rec, &control)) != 0) { + if (app->shared_data.app_finished) { + ret = 0; + goto netclose; + } /* * Close this connection; if it's the master call * for an election. @@ -183,6 +187,7 @@ hm_loop(args) "thread join failure"); goto out; } + free(ea); ea = NULL; } if ((ea = calloc(sizeof(elect_args), 1)) == NULL) { @@ -219,15 +224,21 @@ hm_loop(args) out: if ((t_ret = machtab_rem(tab, eid, 1)) != 0 && ret == 0) ret = t_ret; - +netclose: /* Don't close the environment before any children exit. */ - if (ea != NULL && thread_join(elect_thr, &status) != 0) - dbenv->errx(dbenv, "can't join election thread"); + if (ea != NULL) { + if (thread_join(elect_thr, &status) != 0) + dbenv->errx(dbenv, "can't join election thread"); + free(ea); + ea = NULL; + } - if (site_thrs != NULL) + if (site_thrs != NULL) { while (--nsites >= 0) if (thread_join(site_thrs[nsites], &status) != 0) dbenv->errx(dbenv, "can't join site thread"); + free(site_thrs); + } return ((void *)(uintptr_t)ret); } @@ -242,6 +253,7 @@ connect_thread(args) void *args; { DB_ENV *dbenv; + APP_DATA *app; const char *home, *progname; hm_loop_args *ha; connect_args *cargs; @@ -252,18 +264,20 @@ connect_thread(args) socket_t fd, ns; ha = NULL; + i = 0; cargs = (connect_args *)args; dbenv = cargs->dbenv; home = cargs->home; progname = cargs->progname; machtab = cargs->machtab; port = cargs->port; + app = dbenv->app_private; /* * Loop forever, accepting connections from new machines, * and forking off a thread to handle each. */ - if ((fd = listen_socket_init(progname, port)) < 0) { + if ((fd = listen_socket_init(progname, port, machtab)) < 0) { ret = errno; goto err; } @@ -271,7 +285,10 @@ connect_thread(args) for (i = 0; i < MAX_THREADS; i++) { if ((ns = listen_socket_accept(machtab, progname, fd, &eid)) == SOCKET_CREATION_FAILURE) { - ret = errno; + if (app->shared_data.app_finished) + ret = 0; + else + ret = errno; goto err; } if ((ha = calloc(sizeof(hm_loop_args), 1)) == NULL) { @@ -285,7 +302,7 @@ connect_thread(args) ha->eid = eid; ha->tab = machtab; ha->dbenv = dbenv; - if ((ret = thread_create(&hm_thrs[i++], NULL, + if ((ret = thread_create(&hm_thrs[i], NULL, hm_loop, (void *)ha)) != 0) { dbenv->errx(dbenv, "can't create thread for site"); goto err; @@ -297,12 +314,13 @@ connect_thread(args) dbenv->errx(dbenv, "Too many threads"); ret = ENOMEM; +err: /* Do not return until all threads have exited. */ while (--i >= 0) if (thread_join(hm_thrs[i], &status) != 0) dbenv->errx(dbenv, "can't join site thread"); -err: return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE); + return (ret == 0 ? (void *)EXIT_SUCCESS : (void *)EXIT_FAILURE); } /* @@ -314,15 +332,15 @@ connect_all(args) void *args; { DB_ENV *dbenv; + APP_DATA *app; all_args *aa; const char *home, *progname; - hm_loop_args *ha; int failed, i, nsites, open, ret, *success; machtab_t *machtab; - thread_t *hm_thr; + thread_t empty_id, *hm_thr; repsite_t *sites; + void *status; - ha = NULL; aa = (all_args *)args; dbenv = aa->dbenv; progname = aa->progname; @@ -330,6 +348,9 @@ connect_all(args) machtab = aa->machtab; nsites = aa->nsites; sites = aa->sites; + status = NULL; + app = dbenv->app_private; + memset(&empty_id, 0, sizeof(thread_t)); ret = 0; hm_thr = NULL; @@ -353,6 +374,9 @@ connect_all(args) if (success[i]) continue; + if (app->shared_data.app_finished) + goto err; + ret = connect_site(dbenv, machtab, progname, &sites[i], &open, &hm_thr[i]); @@ -378,7 +402,15 @@ connect_all(args) sleep(1); } -err: if (success != NULL) +err: + for (i = 0; i < nsites; i++) { + if (!success[i] || + memcmp(&hm_thr[i], &empty_id, sizeof(thread_t)) == 0) + continue; + if (thread_join(hm_thr[i], &status) != 0) + dbenv->errx(dbenv, "can't join site thread"); + } + if (success != NULL) free(success); if (hm_thr != NULL) free(hm_thr); @@ -453,12 +485,16 @@ elect_thread(args) machtab_parm(machtab, &n, &timeout); (void)dbenv->rep_set_timeout(dbenv, DB_REP_ELECTION_TIMEOUT, timeout); - while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) + while ((ret = dbenv->rep_elect(dbenv, n, (n/2+1), 0)) != 0) { + if (app->shared_data.app_finished) + return (NULL); sleep(2); + } if (app->elected) { app->elected = 0; - if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) + if ((ret = dbenv->rep_start(dbenv, NULL, + DB_REP_MASTER)) != 0 && !app->shared_data.app_finished) dbenv->err(dbenv, ret, "can't start as master in election thread"); } diff --git a/examples/c/ex_rep/base/rep_net.c b/examples/c/ex_rep/base/rep_net.c index 29012536..e57e3af8 100644 --- a/examples/c/ex_rep/base/rep_net.c +++ b/examples/c/ex_rep/base/rep_net.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -70,6 +70,7 @@ struct __machtab { int current; int max; int nsites; + socket_t listenfd; }; /* Data structure that describes each entry in the machtab. */ @@ -113,6 +114,7 @@ machtab_init(machtabp, nsites) machtab->timeout_time = 2 * 1000000; /* 2 seconds. */ machtab->current = machtab->max = 0; machtab->nsites = nsites; + machtab->listenfd = SOCKET_CREATION_FAILURE; ret = mutex_init(&machtab->mtmutex, NULL); *machtabp = machtab; @@ -265,6 +267,47 @@ machtab_rem(machtab, eid, lock) return (ret); } +/* + * machtab_destroy -- + * Close the listening socket and all connecting sockets, + * So that all threads blocked on 'accept' and 'read' will be + * unblocked. + */ +int machtab_destroy(machtab) + machtab_t *machtab; +{ + int ret; + socket_t listenfd; + member_t *member; + + if ((ret = mutex_lock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't lock mutex\n"); + return (ret); + } + + listenfd = machtab->listenfd; + machtab->listenfd = SOCKET_CREATION_FAILURE; + for (member = LIST_FIRST(&machtab->machlist); + member != NULL; + member = LIST_FIRST(&machtab->machlist)) { + LIST_REMOVE(member, links); + shutdown(member->fd, SHUT_RDWR); + (void)closesocket(member->fd); + free(member); + machtab->current--; + } + shutdown(listenfd, SHUT_RDWR); + (void)closesocket(listenfd); + machtab->nextid = 2; + machtab->current = machtab->max = 0; + + if ((ret = mutex_unlock(&machtab->mtmutex)) != 0) { + fprintf(stderr, "can't unlock mutex\n"); + return (ret); + } + return (ret); +} + void machtab_parm(machtab, nump, timeoutp) machtab_t *machtab; @@ -310,9 +353,10 @@ machtab_print(machtab) * in a thread that we're happy to let block. */ socket_t -listen_socket_init(progname, port) +listen_socket_init(progname, port, machtab) const char *progname; int port; + machtab_t *machtab; { socket_t s; int sockopt; @@ -349,6 +393,7 @@ listen_socket_init(progname, port) goto err; } + machtab->listenfd = s; return (s); err: closesocket(s); @@ -380,7 +425,8 @@ accept_wait: si_len = sizeof(si); ns = accept(s, (struct sockaddr *)&si, &si_len); if (ns == SOCKET_CREATION_FAILURE) { - fprintf(stderr, "can't accept incoming connection\n"); + if (machtab->listenfd != SOCKET_CREATION_FAILURE) + fprintf(stderr, "can't accept incoming connection\n"); return ns; } host = ntohl(si.sin_addr.s_addr); diff --git a/examples/c/ex_rep/common/rep_common.c b/examples/c/ex_rep/common/rep_common.c index a091adda..d7d2585a 100644 --- a/examples/c/ex_rep/common/rep_common.c +++ b/examples/c/ex_rep/common/rep_common.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -19,6 +19,18 @@ #define DATABASE "quote.db" #define SLEEPTIME 3 +/* + * Definition of thread-specific data key for PERM_FAILED structure + * stored in thread local storage. + */ +#ifdef _WIN32 +/* Windows style. */ +DWORD permfail_key; +#else +/* Posix style. */ +pthread_key_t permfail_key; +#endif + static int print_stocks __P((DB *)); /* @@ -346,15 +358,27 @@ doloop(dbenv, shared_data) { DB *dbp; DBT key, data; + permfail_t *pfinfo; char buf[BUFSIZE], *first, *price; u_int32_t flags; int ret; dbp = NULL; + pfinfo = NULL; ret = 0; memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); + /* Allocate put/commit thread's PERM_FAILED structure. */ + if (shared_data->is_repmgr) { + if ((pfinfo = malloc(sizeof(permfail_t))) == NULL) + goto err; + if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0) + goto err; + pfinfo->thread_name = "PutCommit"; + pfinfo->flag = 0; + } + for (;;) { printf("QUOTESERVER%s> ", shared_data->is_master ? "" : " (read-only)"); @@ -431,6 +455,16 @@ doloop(dbenv, shared_data) dbenv->err(dbenv, ret, "DB->open"); goto err; } + /* Check this thread's PERM_FAILED indicator. */ + if (shared_data->is_repmgr) { + pfinfo = (permfail_t *)thread_getspecific( + permfail_key); + if (pfinfo->flag) + printf( + "%s Thread: dbopen not durable.\n", + pfinfo->thread_name); + pfinfo->flag = 0; + } } if (first == NULL) { @@ -470,11 +504,23 @@ doloop(dbenv, shared_data) dbp->err(dbp, ret, "DB->put"); goto err; } + /* Check this thread's PERM_FAILED indicator. */ + if (shared_data->is_repmgr) { + pfinfo = (permfail_t *)thread_getspecific( + permfail_key); + if (pfinfo->flag) + printf( + "%s Thread: put %s %s not durable.\n", + pfinfo->thread_name, first, price); + pfinfo->flag = 0; + } } } err: if (dbp != NULL) (void)dbp->close(dbp, DB_NOSYNC); + if (pfinfo != NULL) + free(pfinfo); return (ret); } @@ -575,12 +621,24 @@ checkpoint_thread(args) { DB_ENV *dbenv; SHARED_DATA *shared; + permfail_t *pfinfo; supthr_args *ca; int i, ret; ca = (supthr_args *)args; dbenv = ca->dbenv; shared = ca->shared; + pfinfo = NULL; + + /* Allocate checkpoint thread's PERM_FAILED structure. */ + if (shared->is_repmgr) { + if ((pfinfo = malloc(sizeof(permfail_t))) == NULL) + return ((void *)EXIT_FAILURE); + if ((ret = thread_setspecific(permfail_key, pfinfo)) != 0) + return ((void *)EXIT_FAILURE); + pfinfo->thread_name = "Checkpoint"; + pfinfo->flag = 0; + } for (;;) { /* @@ -590,16 +648,30 @@ checkpoint_thread(args) */ for (i = 0; i < 60; i++) { sleep(1); - if (shared->app_finished == 1) + if (shared->app_finished == 1) { + if (pfinfo != NULL) + free(pfinfo); return ((void *)EXIT_SUCCESS); + } } /* Perform a checkpoint. */ if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) { dbenv->err(dbenv, ret, "Could not perform checkpoint.\n"); + if (pfinfo != NULL) + free(pfinfo); return ((void *)EXIT_FAILURE); } + /* Check this thread's PERM_FAILED indicator. */ + if (shared->is_repmgr) { + pfinfo = (permfail_t *)thread_getspecific( + permfail_key); + if (pfinfo->flag) + printf("%s Thread: checkpoint not durable.\n", + pfinfo->thread_name); + pfinfo->flag = 0; + } } } diff --git a/examples/c/ex_rep/common/rep_common.h b/examples/c/ex_rep/common/rep_common.h index edf3b67d..8f7d1348 100644 --- a/examples/c/ex_rep/common/rep_common.h +++ b/examples/c/ex_rep/common/rep_common.h @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -33,6 +33,7 @@ typedef struct { int is_master; int app_finished; int in_client_sync; + int is_repmgr; } SHARED_DATA; /* Arguments for support threads. */ @@ -41,6 +42,15 @@ typedef struct { SHARED_DATA *shared; } supthr_args; +/* + * Per-thread Replication Manager structure to associate a PERM_FAILED event + * with its originating transaction. + */ +typedef struct { + char *thread_name; + int flag; +} permfail_t; + /* Portability macros for basic threading & timing */ #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN @@ -59,6 +69,16 @@ typedef HANDLE thread_t; ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) +/* Thread-specific data key for PERM_FAILED structure for Windows. */ +extern DWORD permfail_key; +/* Thread local storage routine definitions for Windows. */ +#define thread_key_create(keyp) ((*keyp = TlsAlloc()) == \ + TLS_OUT_OF_INDEXES ? (int)GetLastError() : 0) +#define thread_key_delete(key) (TlsFree(key) ? 0 : (int)GetLastError()) +#define thread_setspecific(key, value) (TlsSetValue(key, value) ? 0 : \ + (int)GetLastError()) +#define thread_getspecific(key) TlsGetValue(key) + #else /* !_WIN32 */ #include <sys/time.h> #include <pthread.h> @@ -68,6 +88,14 @@ typedef pthread_t thread_t; pthread_create((thrp), (attr), (func), (arg)) #define thread_join(thr, statusp) pthread_join((thr), (statusp)) +/* Thread-specific data key for PERM_FAILED structure for Posix. */ +extern pthread_key_t permfail_key; +/* Thread local storage routine definitions for Posix. */ +#define thread_key_create(keyp) pthread_key_create(keyp, NULL) +#define thread_key_delete(key) pthread_key_delete(key) +#define thread_setspecific(key, value) pthread_setspecific(key, value) +#define thread_getspecific(key) pthread_getspecific(key) + #endif void *checkpoint_thread __P((void *)); diff --git a/examples/c/ex_rep/mgr/rep_mgr.c b/examples/c/ex_rep/mgr/rep_mgr.c index 0eaf1971..5c6d15f5 100644 --- a/examples/c/ex_rep/mgr/rep_mgr.c +++ b/examples/c/ex_rep/mgr/rep_mgr.c @@ -1,7 +1,7 @@ /*- * See the file LICENSE for redistribution information. * - * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2015 Oracle and/or its affiliates. All rights reserved. * * $Id$ */ @@ -115,6 +115,10 @@ main(argc, argv) dbenv->err(dbenv, ret, "Could not set heartbeat monitor timeout.\n"); + /* Create thread-specific data key for PERM_FAILED structure. */ + if ((ret = thread_key_create(&permfail_key)) != 0) + goto err; + /* * The following repmgr features may also be useful to your * application. See Berkeley DB documentation for more details. @@ -133,6 +137,7 @@ main(argc, argv) /* Start checkpoint and log archive threads. */ sup_args.dbenv = dbenv; sup_args.shared = &my_app_data.shared_data; + my_app_data.shared_data.is_repmgr = 1; if ((ret = start_support_threads(dbenv, &sup_args, &ckp_thr, &lga_thr)) != 0) goto err; @@ -163,6 +168,10 @@ main(argc, argv) goto err; } + /* Delete thread-specific data key for PERM_FAILED structure. */ + if ((ret = thread_key_delete(permfail_key)) != 0) + goto err; + err: if (dbenv != NULL && (t_ret = dbenv->close(dbenv, 0)) != 0) { @@ -183,8 +192,10 @@ event_callback(dbenv, which, info) { APP_DATA *app = dbenv->app_private; SHARED_DATA *shared = &app->shared_data; + permfail_t *pfinfo; int err; + pfinfo = NULL; switch (which) { case DB_EVENT_PANIC: @@ -213,8 +224,12 @@ event_callback(dbenv, which, info) * transaction will be flushed to the master site's * local disk storage for durability. */ + /* Set this thread's PERM_FAILED indicator. */ + pfinfo = (permfail_t *)thread_getspecific(permfail_key); + printf("%s Thread: ", pfinfo->thread_name); + pfinfo->flag = 1; printf( - "Insufficient acknowledgements to guarantee transaction durability.\n"); + "Insufficient acknowledgements for transaction durability.\n"); break; case DB_EVENT_REP_STARTUPDONE: |
