summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--debuginfod/ChangeLog11
-rw-r--r--debuginfod/debuginfod.cxx128
-rw-r--r--tests/ChangeLog4
-rwxr-xr-xtests/run-debuginfod-find.sh1
4 files changed, 100 insertions, 44 deletions
diff --git a/debuginfod/ChangeLog b/debuginfod/ChangeLog
index 35130b2a..83b94a11 100644
--- a/debuginfod/ChangeLog
+++ b/debuginfod/ChangeLog
@@ -1,3 +1,14 @@
+2020-01-19 Frank Ch. Eigler <fche@redhat.com>
+
+ * debuginfod.cxx (scanq): Rework to let groomer/fts threads
+ synchronize with an empty workqueue, and lock out workqueue
+ consumers.
+ (thread_groom): Adopt new scanq idle APIs to lock out scanners.
+ (thread_main_fts_source_paths): Adopt new scanq idler API to
+ avoid being restarted while scanners haven't even finished yet.
+ (thread_main_*): Increment thread_work_total metric only after
+ a work cycle is completed, not when it begins.
+
2020-01-18 Frank Ch. Eigler <fche@redhat.com>
* debuginfod.cxx (thread_main_scanner): Handle empty source_paths[].
diff --git a/debuginfod/debuginfod.cxx b/debuginfod/debuginfod.cxx
index bf680048..b93e29ff 100644
--- a/debuginfod/debuginfod.cxx
+++ b/debuginfod/debuginfod.cxx
@@ -374,7 +374,7 @@ static struct argp argp =
static string db_path;
-static sqlite3 *db;
+static sqlite3 *db; // single connection, serialized across all our threads!
static unsigned verbose;
static volatile sig_atomic_t interrupted = 0;
static volatile sig_atomic_t sigusr1 = 0;
@@ -537,23 +537,25 @@ struct elfutils_exception: public reportable_exception
template <typename Payload>
class workq
{
- deque<Payload> q;
+ set<Payload> q; // eliminate duplicates
mutex mtx;
condition_variable cv;
bool dead;
+ unsigned idlers;
public:
- workq() { dead = false;}
+ workq() { dead = false; idlers = 0; }
~workq() {}
void push_back(const Payload& p)
{
unique_lock<mutex> lock(mtx);
- q.push_back(p);
+ q.insert (p);
set_metric("thread_work_pending","role","scan", q.size());
- cv.notify_one();
+ cv.notify_all();
}
+ // kill this workqueue, wake up all idlers / scanners
void nuke() {
unique_lock<mutex> lock(mtx);
// optional: q.clear();
@@ -561,28 +563,53 @@ public:
cv.notify_all();
}
+ // block this scanner thread until there is work to do and no active
bool wait_front (Payload& p)
{
unique_lock<mutex> lock(mtx);
- while (q.size() == 0 && !dead)
+ while (!dead && (q.size() == 0 || idlers > 0))
cv.wait(lock);
if (dead)
return false;
else
{
- p = q.front();
- q.pop_front();
+ p = * q.begin();
+ q.erase (q.begin());
set_metric("thread_work_pending","role","scan", q.size());
+ if (q.size() == 0)
+ cv.notify_all(); // maybe wake up waiting idlers
return true;
}
}
+
+ // block this idler thread until there is no work to do
+ void wait_idle ()
+ {
+ unique_lock<mutex> lock(mtx);
+ cv.notify_all(); // maybe wake up waiting scanners
+ while (!dead && (q.size() != 0))
+ cv.wait(lock);
+ idlers ++;
+ }
+
+ void done_idle ()
+ {
+ unique_lock<mutex> lock(mtx);
+ idlers --;
+ cv.notify_all(); // maybe wake up waiting scanners, but probably not (shutting down)
+ }
};
typedef struct stat stat_t;
typedef pair<string,stat_t> scan_payload;
+inline bool operator< (const scan_payload& a, const scan_payload& b)
+{
+ return a.first < b.first; // don't bother compare the stat fields
+}
static workq<scan_payload> scanq; // just a single one
-// producer: thread_main_fts_source_paths()
+// producer & idler: thread_main_fts_source_paths()
// consumer: thread_main_scanner()
+// idler: thread_main_groom()
@@ -2309,7 +2336,6 @@ thread_main_scanner (void* arg)
bool gotone = scanq.wait_front(p);
add_metric("thread_busy", "role", "scan", 1);
if (! gotone) continue; // or break
- inc_metric("thread_work_total", "role","scan");
try
{
@@ -2347,6 +2373,8 @@ thread_main_scanner (void* arg)
{
e.report(cerr);
}
+
+ inc_metric("thread_work_total", "role","scan");
}
add_metric("thread_busy", "role", "scan", -1);
@@ -2448,41 +2476,44 @@ thread_main_fts_source_paths (void* arg)
{
(void) arg; // ignore; we operate on global data
- unsigned rescan_timer = 0;
sig_atomic_t forced_rescan_count = 0;
- set_metric("thread_timer_max", "role","traverse", rescan_s);
set_metric("thread_tid", "role","traverse", tid());
add_metric("thread_count", "role", "traverse", 1);
+
+ time_t last_rescan = 0;
+
while (! interrupted)
{
- set_metric("thread_timer", "role","traverse", rescan_timer);
- // set_metric("thread_forced_total", "role","traverse", forced_rescan_count);
- if (rescan_s && rescan_timer > rescan_s)
- rescan_timer = 0;
+ sleep (1);
+ scanq.wait_idle(); // don't start a new traversal while scanners haven't finished the job
+ scanq.done_idle(); // release the hounds
+ if (interrupted) break;
+
+ time_t now = time(NULL);
+ bool rescan_now = false;
+ if (last_rescan == 0) // at least one initial rescan is documented even for -t0
+ rescan_now = true;
+ if (rescan_s > 0 && now > last_rescan + rescan_s)
+ rescan_now = true;
if (sigusr1 != forced_rescan_count)
{
forced_rescan_count = sigusr1;
- rescan_timer = 0;
+ rescan_now = true;
}
- if (rescan_timer == 0)
+ if (rescan_now)
try
{
set_metric("thread_busy", "role","traverse", 1);
- inc_metric("thread_work_total", "role","traverse");
scan_source_paths();
+ inc_metric("thread_work_total", "role","traverse");
set_metric("thread_busy", "role","traverse", 0);
}
catch (const reportable_exception& e)
{
e.report(cerr);
}
- sleep (1);
- rescan_timer ++;
}
- // wake up any blocked scanning threads so they can check $interrupted and kill themselves
- scanq.nuke();
-
return 0;
}
@@ -2589,36 +2620,44 @@ void groom()
static void*
thread_main_groom (void* /*arg*/)
{
- unsigned groom_timer = 0;
sig_atomic_t forced_groom_count = 0;
- set_metric("thread_timer_max", "role", "groom", groom_s);
set_metric("thread_tid", "role", "groom", tid());
add_metric("thread_count", "role", "groom", 1);
- while (! interrupted)
+
+ time_t last_groom = 0;
+
+ while (1)
{
- set_metric("thread_timer", "role", "groom", groom_timer);
- // set_metric("thread_forced_total", "role", "groom", forced_groom_count);
- if (groom_s && groom_timer > groom_s)
- groom_timer = 0;
+ sleep (1);
+ scanq.wait_idle(); // PR25394: block scanners during grooming!
+ if (interrupted) break;
+
+ time_t now = time(NULL);
+ bool groom_now = false;
+ if (last_groom == 0) // at least one initial groom is documented even for -g0
+ groom_now = true;
+ if (groom_s > 0 && now > last_groom + groom_s)
+ groom_now = true;
if (sigusr2 != forced_groom_count)
{
forced_groom_count = sigusr2;
- groom_timer = 0;
+ groom_now = true;
}
- if (groom_timer == 0)
+ if (groom_now)
try
{
set_metric("thread_busy", "role", "groom", 1);
- inc_metric("thread_work_total", "role", "groom");
groom ();
+ last_groom = time(NULL); // NB: now was before grooming
+ inc_metric("thread_work_total", "role", "groom");
set_metric("thread_busy", "role", "groom", 0);
}
catch (const sqlite_exception& e)
{
obatched(cerr) << e.message << endl;
}
- sleep (1);
- groom_timer ++;
+
+ scanq.done_idle();
}
return 0;
@@ -2860,26 +2899,27 @@ main (int argc, char *argv[])
if (du && du[0] != '\0') // set to non-empty string?
obatched(clog) << "upstream debuginfod servers: " << du << endl;
- vector<pthread_t> scanner_threads;
- pthread_t groom_thread;
+ vector<pthread_t> all_threads;
- rc = pthread_create (& groom_thread, NULL, thread_main_groom, NULL);
+ pthread_t pt;
+ rc = pthread_create (& pt, NULL, thread_main_groom, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to groom database\n", rc);
+ else
+ all_threads.push_back(pt);
if (scan_files || scan_archives.size() > 0)
{
- pthread_t pt;
pthread_create (& pt, NULL, thread_main_fts_source_paths, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to traverse source paths\n", rc);
- scanner_threads.push_back(pt);
+ all_threads.push_back(pt);
for (unsigned i=0; i<concurrency; i++)
{
pthread_create (& pt, NULL, thread_main_scanner, NULL);
if (rc < 0)
error (0, 0, "warning: cannot spawn thread (%d) to scan source files / archives\n", rc);
- scanner_threads.push_back(pt);
+ all_threads.push_back(pt);
}
}
@@ -2887,15 +2927,15 @@ main (int argc, char *argv[])
set_metric("ready", 1);
while (! interrupted)
pause ();
+ scanq.nuke(); // wake up any remaining scanq-related threads, let them die
set_metric("ready", 0);
if (verbose)
obatched(clog) << "stopping" << endl;
/* Join all our threads. */
- for (auto&& it : scanner_threads)
+ for (auto&& it : all_threads)
pthread_join (it, NULL);
- pthread_join (groom_thread, NULL);
/* Stop all the web service threads. */
if (d4) MHD_stop_daemon (d4);
diff --git a/tests/ChangeLog b/tests/ChangeLog
index 8936e410..8b032041 100644
--- a/tests/ChangeLog
+++ b/tests/ChangeLog
@@ -1,3 +1,7 @@
+2020-01-19 Frank Ch. Eigler <fche@redhat.com>
+
+ * run-debuginfod-find.sh: Check for proper groom completion count.
+
2020-01-18 Frank Ch. Eigler <fche@redhat.com>
* run-debuginfod-find.sh: Test empty source_paths[].
diff --git a/tests/run-debuginfod-find.sh b/tests/run-debuginfod-find.sh
index 2a3d591b..33bbd1a6 100755
--- a/tests/run-debuginfod-find.sh
+++ b/tests/run-debuginfod-find.sh
@@ -257,6 +257,7 @@ RPM_BUILDID=d44d42cbd7d915bc938c81333a21e355a6022fb7 # in rhel6/ subdir, for a l
rm -r R/debuginfod-rpms/rhel6/*
kill -USR2 $PID1 # groom cycle
# Expect 3 rpms to be deleted by the groom
+wait_ready $PORT1 'thread_work_total{role="groom"}' 1
wait_ready $PORT1 'groom{statistic="file d/e"}' 3
rm -rf $DEBUGINFOD_CACHE_PATH # clean it from previous tests