summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrank Ch. Eigler <fche@redhat.com>2020-01-19 20:33:32 -0500
committerFrank Ch. Eigler <fche@redhat.com>2020-01-19 20:40:00 -0500
commit34e67018914cf9ebbef07065965755b6554fd66e (patch)
tree1f56d24e0734a9b23d6318e6819a66b3acb46938
parentc02dfd95e3db08460dfc61794564fbe4c05dc396 (diff)
downloadelfutils-fche/debuginfod-PR25394.tar.gz
PR25394: debuginfod mutex between grooming and scanningfche/debuginfod-PR25394
Extended the work-queue concept with "idlers" - other threads that block on the work queue until it becomes empty (rather than normal consumers that block on it until it becomes non-empty). Use this facility for the groomer thread to avoid working at the same time as the scanner threads. Use this for the fts traversal thread for similar reasons. One user-visible effect: response to SIGUSR1 and SIGUSR2 will wait until the work queue runs empty, but the man page was unspecific so does not need changing. It's not obvious how to test this with a tests/ dataset so small that scanning takes negligible time, so the former races are very tight. P.S. We also evaluated using sqlite level transactions to isolate the scanner thread groups-of-operations from the groomer. These experiments failed to produce a nominally concurrent debuginfod, having triggered "database locked" type errors. So we remain single-threaded (fully serialized) at the sqlite API level.
-rw-r--r--debuginfod/ChangeLog11
-rw-r--r--debuginfod/debuginfod.cxx128
-rw-r--r--tests/ChangeLog4
-rwxr-xr-xtests/run-debuginfod-find.sh1
4 files changed, 100 insertions, 44 deletions
diff --git a/debuginfod/ChangeLog b/debuginfod/ChangeLog
index 35130b2a..83b94a11 100644
--- a/debuginfod/ChangeLog
+++ b/debuginfod/ChangeLog
@@ -1,3 +1,14 @@
+2020-01-19 Frank Ch. Eigler <fche@redhat.com>
+
+ * debuginfod.cxx (scanq): Rework to let groomer/fts threads
+ synchronize with an empty workqueue, and lock out workqueue
+ consumers.
+ (thread_groom): Adopt new scanq idle APIs to lock out scanners.
+ (thread_main_fts_source_paths): Adopt new scanq idler API to
+ avoid being restarted while scanners haven't even finished yet.
+ (thread_main_*): Increment thread_work_total metric only after
+ a work cycle is completed, not when it begins.
+
2020-01-18 Frank Ch. Eigler <fche@redhat.com>
* debuginfod.cxx (thread_main_scanner): Handle empty source_paths[].
diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx
index bf680048..b93e29ff 100644
--- a/debuginfod/debuginfod.cxx
+++ b/debuginfod/debuginfod.cxx
@@ -374,7 +374,7 @@ static struct argp argp =
static string db_path;
-static sqlite3 *db;
+static sqlite3 *db; // single connection, serialized across all our threads!
static unsigned verbose;
static volatile sig_atomic_t interrupted = 0;
static volatile sig_atomic_t sigusr1 = 0;
@@ -537,23 +537,25 @@ struct elfutils_exception: public reportable_exception
template <typename Payload>
class workq
{
- deque<Payload> q;
+ set<Payload> q; // eliminate duplicates
mutex mtx;
condition_variable cv;
bool dead;
+ unsigned idlers;
public:
- workq() { dead = false;}
+ workq() { dead = false; idlers = 0; }
~workq() {}
void push_back(const Payload& p)
{
unique_lock<mutex> lock(mtx);
- q.push_back(p);
+ q.insert (p);
set_metric("thread_work_pending","role","scan", q.size());
- cv.notify_one();
+ cv.notify_all();
}
+ // kill this workqueue, wake up all idlers / scanners
void nuke() {
unique_lock<mutex> lock(mtx);
// optional: q.clear();
@@ -561,28 +563,53 @@ public:
cv.notify_all();
}
+ // block this scanner thread until there is work to do and no active
bool wait_front (Payload& p)
{
unique_lock<mutex> lock(mtx);
- while (q.size() == 0 && !dead)
+ while (!dead && (q.size() == 0 || idlers > 0))
cv.wait(lock);
if (dead)
return false;
else
{
- p = q.front();
- q.pop_front();
+ p = * q.begin();
+ q.erase (q.begin());
set_metric("thread_work_pending","role","scan", q.size());
+ if (q.size() == 0)
+ cv.notify_all(); // maybe wake up waiting idlers
return true;
}
}
+
+ // block this idler thread until there is no work to do
+ void wait_idle ()
+ {
+ unique_lock<mutex> lock(mtx);
+ cv.notify_all(); // maybe wake up waiting scanners
+ while (!dead && (q.size() != 0))
+ cv.wait(lock);
+ idlers ++;
+ }
+
+ void done_idle ()
+ {
+ unique_lock<mutex> lock(mtx);
+ idlers --;
+ cv.notify_all(); // maybe wake up waiting scanners, but probably not (shutting down)
+ }
};
typedef struct stat stat_t;
typedef pair<string,stat_t> scan_payload;
+inline bool operator< (const scan_payload& a, const scan_payload& b)
+{
+ return a.first < b.first; // don't bother compare the stat fields
+}
static workq<scan_payload> scanq; // just a single one
-// producer: thread_main_fts_source_paths()
+// producer & idler: thread_main_fts_source_paths()
// consumer: thread_main_scanner()
+// idler: thread_main_groom()
@@ -2309,7 +2336,6 @@ thread_main_scanner (void* arg)
bool gotone = scanq.wait_front(p);
add_metric("thread_busy", "role", "scan", 1);
if (! gotone) continue; // or break
- inc_metric("thread_work_total", "role","scan");
try
{
@@ -2347,6 +2373,8 @@ thread_main_scanner (void* arg)
{
e.report(cerr);
}
+
+ inc_metric("thread_work_total", "role","scan");
}
add_metric("thread_busy", "role", "scan", -1);
@@ -2448,41 +2476,44 @@ thread_main_fts_source_paths (void* arg)
{
(void) arg; // ignore; we operate on global data
- unsigned rescan_timer = 0;
sig_atomic_t forced_rescan_count = 0;
- set_metric("thread_timer_max", "role","traverse", rescan_s);
set_metric("thread_tid", "role","traverse", tid());
add_metric("thread_count", "role", "traverse", 1);
+
+ time_t last_rescan = 0;
+
while (! interrupted)
{
- set_metric("thread_timer", "role","traverse", rescan_timer);
- // set_metric("thread_forced_total", "role","traverse", forced_rescan_count);
- if (rescan_s && rescan_timer > rescan_s)
- rescan_timer = 0;
+ sleep (1);
+ scanq.wait_idle(); // don't start a new traversal while scanners haven't finished the job
+ scanq.done_idle(); // release the hounds
+ if (interrupted) break;
+
+ time_t now = time(NULL);
+ bool rescan_now = false;
+ if (last_rescan == 0) // at least one initial rescan is documented even for -t0
+ rescan_now = true;
+ if (rescan_s > 0 && now > last_rescan + rescan_s)
+ rescan_now = true;
if (sigusr1 != forced_rescan_count)
{
forced_rescan_count = sigusr1;
- rescan_timer = 0;
+ rescan_now = true;
}
- if (rescan_timer == 0)
+ if (rescan_now)
try
{
set_metric("thread_busy", "role","traverse", 1);
- inc_metric("thread_work_total", "role","traverse");
scan_source_paths();
+ inc_metric("thread_work_total", "role","traverse");
set_metric("thread_busy", "role","traverse", 0);
}
catch (const reportable_exception& e)
{
e.report(cerr);
}
- sleep (1);
- rescan_timer ++;
}
- // wake up any blocked scanning threads so they can check $interrupted and kill themselves
- scanq.nuke();
-
return 0;
}
@@ -2589,36 +2620,44 @@ void groom()
static void*
thread_main_groom (void* /*arg*/)
{
- unsigned groom_timer = 0;
sig_atomic_t forced_groom_count = 0;
- set_metric("thread_timer_max", "role", "groom", groom_s);
set_metric("thread_tid", "role", "groom", tid());
add_metric("thread_count", "role", "groom", 1);
- while (! interrupted)
+
+ time_t last_groom = 0;
+
+ while (1)
{
- set_metric("thread_timer", "role", "groom", groom_timer);
- // set_metric("thread_forced_total", "role", "groom", forced_groom_count);
- if (groom_s && groom_timer > groom_s)
- groom_timer = 0;
+ sleep (1);
+ scanq.wait_idle(); // PR25394: block scanners during grooming!
+ if (interrupted) break;
+
+ time_t now = time(NULL);
+ bool groom_now = false;
+ if (last_groom == 0) // at least one initial groom is documented even for -g0
+ groom_now = true;
+ if (groom_s > 0 && now > last_groom + groom_s)
+ groom_now = true;
if (sigusr2 != forced_groom_count)
{
forced_groom_count = sigusr2;
- groom_timer = 0;
+ groom_now = true;
}
- if (groom_timer == 0)
+ if (groom_now)
try
{
set_metric("thread_busy", "role", "groom", 1);
- inc_metric("thread_work_total", "role", "groom");
groom ();
+ last_groom = time(NULL); // NB: now was before grooming
+ inc_metric("thread_work_total", "role", "groom");
set_metric("thread_busy", "role", "groom", 0);
}
catch (const sqlite_exception& e)
{
obatched(cerr) << e.message << endl;
}
- sleep (1);
- groom_timer ++;
+
+ scanq.done_idle();
}
return 0;
@@ -2860,26 +2899,27 @@ main (int argc, char *argv[])
if (du && du[0] != '\0') // set to non-empty string?
obatched(clog) << "upstream debuginfod servers: " << du << endl;
- vector<pthread_t> scanner_threads;
- pthread_t groom_thread;
+ vector<pthread_t> all_threads;
- rc = pthread_create (& groom_thread, NULL, thread_main_groom, NULL);
+ pthread_t pt;
+ rc = pthread_create (& pt, NULL, thread_main_groom, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc);
+ else
+ all_threads.push_back(pt);
if (scan_files || scan_archives.size() > 0)
{
- pthread_t pt;
pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc);
- scanner_threads.push_back(pt);
+ all_threads.push_back(pt);
for (unsigned i=0; i<concurrency; i++)
{
pthread_create (& pt, NULL, thread_main_scanner, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc);
- scanner_threads.push_back(pt);
+ all_threads.push_back(pt);
}
}
@@ -2887,15 +2927,15 @@ main (int argc, char *argv[])
set_metric("ready", 1);
while (! interrupted)
pause ();
+ scanq.nuke(); // wake up any remaining scanq-related threads, let them die
set_metric("ready", 0);
if (verbose)
obatched(clog) << "stopping" << endl;
/* Join all our threads. */
- for (auto&& it : scanner_threads)
+ for (auto&& it : all_threads)
pthread_join (it, NULL);
- pthread_join (groom_thread, NULL);
/* Stop all the web service threads. */
if (d4) MHD_stop_daemon (d4);
diff --git a/tests/ChangeLog b/tests/ChangeLog
index 8936e410..8b032041 100644
--- a/tests/ChangeLog
+++ b/tests/ChangeLog
@@ -1,3 +1,7 @@
+2020-01-19 Frank Ch. Eigler <fche@redhat.com>
+
+ * run-debuginfod-find.sh: Check for proper groom completion count.
+
2020-01-18 Frank Ch. Eigler <fche@redhat.com>
* run-debuginfod-find.sh: Test empty source_paths[].
diff --git a/tests/run-debuginfod-find.sh b/tests/run-debuginfod-find.sh
index 2a3d591b..33bbd1a6 100755
--- a/tests/run-debuginfod-find.sh
+++ b/tests/run-debuginfod-find.sh
@@ -257,6 +257,7 @@ RPM_BUILDID=d44d42cbd7d915bc938c81333a21e355a6022fb7 # in rhel6/ subdir, for a l
rm -r R/debuginfod-rpms/rhel6/*
kill -USR2 $PID1 # groom cycle
# Expect 3 rpms to be deleted by the groom
+wait_ready $PORT1 'thread_work_total{role="groom"}' 1
wait_ready $PORT1 'groom{statistic="file d/e"}' 3
rm -rf $DEBUGINFOD_CACHE_PATH # clean it from previous tests