diff options
-rw-r--r-- | debuginfod/ChangeLog | 11 | ||||
-rw-r--r-- | debuginfod/debuginfod.cxx | 128 | ||||
-rw-r--r-- | tests/ChangeLog | 4 | ||||
-rwxr-xr-x | tests/run-debuginfod-find.sh | 1 |
4 files changed, 100 insertions, 44 deletions
diff --git a/debuginfod/ChangeLog b/debuginfod/ChangeLog index 35130b2a..83b94a11 100644 --- a/debuginfod/ChangeLog +++ b/debuginfod/ChangeLog @@ -1,3 +1,14 @@ +2020-01-19 Frank Ch. Eigler <fche@redhat.com> + + * debuginfod.cxx (scanq): Rework to let groomer/fts threads + synchronize with an empty workqueue, and lock out workqueue + consumers. + (thread_groom): Adopt new scanq idle APIs to lock out scanners. + (thread_main_fts_source_paths): Adopt new scanq idler API to + avoid being restarted while scanners haven't even finished yet. + (thread_main_*): Increment thread_work_total metric only after + a work cycle is completed, not when it begins. + 2020-01-18 Frank Ch. Eigler <fche@redhat.com> * debuginfod.cxx (thread_main_scanner): Handle empty source_paths[]. diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx index bf680048..b93e29ff 100644 --- a/debuginfod/debuginfod.cxx +++ b/debuginfod/debuginfod.cxx @@ -374,7 +374,7 @@ static struct argp argp = static string db_path; -static sqlite3 *db; +static sqlite3 *db; // single connection, serialized across all our threads! static unsigned verbose; static volatile sig_atomic_t interrupted = 0; static volatile sig_atomic_t sigusr1 = 0; @@ -537,23 +537,25 @@ struct elfutils_exception: public reportable_exception template <typename Payload> class workq { - deque<Payload> q; + set<Payload> q; // eliminate duplicates mutex mtx; condition_variable cv; bool dead; + unsigned idlers; public: - workq() { dead = false;} + workq() { dead = false; idlers = 0; } ~workq() {} void push_back(const Payload& p) { unique_lock<mutex> lock(mtx); - q.push_back(p); + q.insert (p); set_metric("thread_work_pending","role","scan", q.size()); - cv.notify_one(); + cv.notify_all(); } + // kill this workqueue, wake up all idlers / scanners void nuke() { unique_lock<mutex> lock(mtx); // optional: q.clear(); @@ -561,28 +563,53 @@ public: cv.notify_all(); } + // block this scanner thread until there is work to do and no active bool wait_front (Payload& p) { unique_lock<mutex> lock(mtx); - while (q.size() == 0 && !dead) + while (!dead && (q.size() == 0 || idlers > 0)) cv.wait(lock); if (dead) return false; else { - p = q.front(); - q.pop_front(); + p = * q.begin(); + q.erase (q.begin()); set_metric("thread_work_pending","role","scan", q.size()); + if (q.size() == 0) + cv.notify_all(); // maybe wake up waiting idlers return true; } } + + // block this idler thread until there is no work to do + void wait_idle () + { + unique_lock<mutex> lock(mtx); + cv.notify_all(); // maybe wake up waiting scanners + while (!dead && (q.size() != 0)) + cv.wait(lock); + idlers ++; + } + + void done_idle () + { + unique_lock<mutex> lock(mtx); + idlers --; + cv.notify_all(); // maybe wake up waiting scanners, but probably not (shutting down) + } }; typedef struct stat stat_t; typedef pair<string,stat_t> scan_payload; +inline bool operator< (const scan_payload& a, const scan_payload& b) +{ + return a.first < b.first; // don't bother compare the stat fields +} static workq<scan_payload> scanq; // just a single one -// producer: thread_main_fts_source_paths() +// producer & idler: thread_main_fts_source_paths() // consumer: thread_main_scanner() +// idler: thread_main_groom() @@ -2309,7 +2336,6 @@ thread_main_scanner (void* arg) bool gotone = scanq.wait_front(p); add_metric("thread_busy", "role", "scan", 1); if (! gotone) continue; // or break - inc_metric("thread_work_total", "role","scan"); try { @@ -2347,6 +2373,8 @@ thread_main_scanner (void* arg) { e.report(cerr); } + + inc_metric("thread_work_total", "role","scan"); } add_metric("thread_busy", "role", "scan", -1); @@ -2448,41 +2476,44 @@ thread_main_fts_source_paths (void* arg) { (void) arg; // ignore; we operate on global data - unsigned rescan_timer = 0; sig_atomic_t forced_rescan_count = 0; - set_metric("thread_timer_max", "role","traverse", rescan_s); set_metric("thread_tid", "role","traverse", tid()); add_metric("thread_count", "role", "traverse", 1); + + time_t last_rescan = 0; + while (! interrupted) { - set_metric("thread_timer", "role","traverse", rescan_timer); - // set_metric("thread_forced_total", "role","traverse", forced_rescan_count); - if (rescan_s && rescan_timer > rescan_s) - rescan_timer = 0; + sleep (1); + scanq.wait_idle(); // don't start a new traversal while scanners haven't finished the job + scanq.done_idle(); // release the hounds + if (interrupted) break; + + time_t now = time(NULL); + bool rescan_now = false; + if (last_rescan == 0) // at least one initial rescan is documented even for -t0 + rescan_now = true; + if (rescan_s > 0 && now > last_rescan + rescan_s) + rescan_now = true; if (sigusr1 != forced_rescan_count) { forced_rescan_count = sigusr1; - rescan_timer = 0; + rescan_now = true; } - if (rescan_timer == 0) + if (rescan_now) try { set_metric("thread_busy", "role","traverse", 1); - inc_metric("thread_work_total", "role","traverse"); scan_source_paths(); + inc_metric("thread_work_total", "role","traverse"); set_metric("thread_busy", "role","traverse", 0); } catch (const reportable_exception& e) { e.report(cerr); } - sleep (1); - rescan_timer ++; } - // wake up any blocked scanning threads so they can check $interrupted and kill themselves - scanq.nuke(); - return 0; } @@ -2589,36 +2620,44 @@ void groom() static void* thread_main_groom (void* /*arg*/) { - unsigned groom_timer = 0; sig_atomic_t forced_groom_count = 0; - set_metric("thread_timer_max", "role", "groom", groom_s); set_metric("thread_tid", "role", "groom", tid()); add_metric("thread_count", "role", "groom", 1); - while (! interrupted) + + time_t last_groom = 0; + + while (1) { - set_metric("thread_timer", "role", "groom", groom_timer); - // set_metric("thread_forced_total", "role", "groom", forced_groom_count); - if (groom_s && groom_timer > groom_s) - groom_timer = 0; + sleep (1); + scanq.wait_idle(); // PR25394: block scanners during grooming! + if (interrupted) break; + + time_t now = time(NULL); + bool groom_now = false; + if (last_groom == 0) // at least one initial groom is documented even for -g0 + groom_now = true; + if (groom_s > 0 && now > last_groom + groom_s) + groom_now = true; if (sigusr2 != forced_groom_count) { forced_groom_count = sigusr2; - groom_timer = 0; + groom_now = true; } - if (groom_timer == 0) + if (groom_now) try { set_metric("thread_busy", "role", "groom", 1); - inc_metric("thread_work_total", "role", "groom"); groom (); + last_groom = time(NULL); // NB: now was before grooming + inc_metric("thread_work_total", "role", "groom"); set_metric("thread_busy", "role", "groom", 0); } catch (const sqlite_exception& e) { obatched(cerr) << e.message << endl; } - sleep (1); - groom_timer ++; + + scanq.done_idle(); } return 0; @@ -2860,26 +2899,27 @@ main (int argc, char *argv[]) if (du && du[0] != '\0') // set to non-empty string? obatched(clog) << "upstream debuginfod servers: " << du << endl; - vector<pthread_t> scanner_threads; - pthread_t groom_thread; + vector<pthread_t> all_threads; - rc = pthread_create (& groom_thread, NULL, thread_main_groom, NULL); + pthread_t pt; + rc = pthread_create (& pt, NULL, thread_main_groom, NULL); if (rc < 0) error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc); + else + all_threads.push_back(pt); if (scan_files || scan_archives.size() > 0) { - pthread_t pt; pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL); if (rc < 0) error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc); - scanner_threads.push_back(pt); + all_threads.push_back(pt); for (unsigned i=0; i<concurrency; i++) { pthread_create (& pt, NULL, thread_main_scanner, NULL); if (rc < 0) error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc); - scanner_threads.push_back(pt); + all_threads.push_back(pt); } } @@ -2887,15 +2927,15 @@ main (int argc, char *argv[]) set_metric("ready", 1); while (! interrupted) pause (); + scanq.nuke(); // wake up any remaining scanq-related threads, let them die set_metric("ready", 0); if (verbose) obatched(clog) << "stopping" << endl; /* Join all our threads. */ - for (auto&& it : scanner_threads) + for (auto&& it : all_threads) pthread_join (it, NULL); - pthread_join (groom_thread, NULL); /* Stop all the web service threads. */ if (d4) MHD_stop_daemon (d4); diff --git a/tests/ChangeLog b/tests/ChangeLog index 8936e410..8b032041 100644 --- a/tests/ChangeLog +++ b/tests/ChangeLog @@ -1,3 +1,7 @@ +2020-01-19 Frank Ch. Eigler <fche@redhat.com> + + * run-debuginfod-find.sh: Check for proper groom completion count. + 2020-01-18 Frank Ch. Eigler <fche@redhat.com> * run-debuginfod-find.sh: Test empty source_paths[]. diff --git a/tests/run-debuginfod-find.sh b/tests/run-debuginfod-find.sh index 2a3d591b..33bbd1a6 100755 --- a/tests/run-debuginfod-find.sh +++ b/tests/run-debuginfod-find.sh @@ -257,6 +257,7 @@ RPM_BUILDID=d44d42cbd7d915bc938c81333a21e355a6022fb7 # in rhel6/ subdir, for a l rm -r R/debuginfod-rpms/rhel6/* kill -USR2 $PID1 # groom cycle # Expect 3 rpms to be deleted by the groom +wait_ready $PORT1 'thread_work_total{role="groom"}' 1 wait_ready $PORT1 'groom{statistic="file d/e"}' 3 rm -rf $DEBUGINFOD_CACHE_PATH # clean it from previous tests |