diff options
author | Kamil TrzciĆski <ayufan@ayufan.eu> | 2019-08-14 17:56:37 +0200 |
---|---|---|
committer | Qingyu Zhao <qzhao@gitlab.com> | 2019-08-21 18:50:46 +1000 |
commit | 75e2302d0126c4bc8ea215ffb4e72612d44e73bb (patch) | |
tree | 9b7bb2eb248080aab20cd8de15cf73ceb8b97dd8 | |
parent | ca622a3e13cf88d94c6b3c98554e9782d37d4ad5 (diff) | |
download | gitlab-ce-75e2302d0126c4bc8ea215ffb4e72612d44e73bb.tar.gz |
Allow to interrupt running jobs
This adds a middleware to track all threads
for running jobs.
This makes sidekiq to watch for redis-delivered notifications.
This makes be able to send notification to interrupt
running sidekiq jobs.
This does not take into account any native code,
as `Thread.raise` generates exception once the control gets
back to Ruby.
The separate measure should be taken to interrupt gRPC, shellouts,
or anything else that escapes Ruby.
-rw-r--r-- | config/initializers/sidekiq.rb | 3 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/jobs_threads.rb | 49 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_status/monitor.rb | 46 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb | 83 |
4 files changed, 181 insertions, 0 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 7217f098fd9..b05d4342e23 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -33,6 +33,7 @@ Sidekiq.configure_server do |config| config.redis = queues_config_hash config.server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS'] chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] @@ -57,6 +58,8 @@ Sidekiq.configure_server do |config| # Clear any connections that might have been obtained before starting # Sidekiq (e.g. in an initializer). ActiveRecord::Base.clear_all_connections! + + Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS'] end if enable_reliable_fetch? diff --git a/lib/gitlab/sidekiq_middleware/jobs_threads.rb b/lib/gitlab/sidekiq_middleware/jobs_threads.rb new file mode 100644 index 00000000000..d0603bcee2d --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/jobs_threads.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class JobsThreads + @@jobs = {} # rubocop:disable Style/ClassVars + MUTEX = Mutex.new + + def call(worker, job, queue) + jid = job['jid'] + + MUTEX.synchronize do + @@jobs[jid] = Thread.current + end + + return if self.class.cancelled?(jid) + + yield + ensure + MUTEX.synchronize do + @@jobs.delete(jid) + end + end + + def self.interrupt(jid) + MUTEX.synchronize do + thread = @@jobs[jid] + break unless thread + + thread.raise(Interrupt) + thread + end + end + + def self.cancelled?(jid) + Sidekiq.redis {|c| c.exists("cancelled-#{jid}") } + end + + def self.mark_job_as_cancelled(jid) + Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) } + "Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}" + end + + def self.jobs + @@jobs + end + end + end +end diff --git a/lib/gitlab/sidekiq_status/monitor.rb b/lib/gitlab/sidekiq_status/monitor.rb new file mode 100644 index 00000000000..3fd9f02b166 --- /dev/null +++ b/lib/gitlab/sidekiq_status/monitor.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqStatus + class Monitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze + + def start_working + Sidekiq.logger.info "Watching sidekiq monitor" + + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + Sidekiq.logger.info "Received #{message} on #{channel}..." + execute_job_cancel(message) + end + end + end + end + + def self.cancel_job(jid) + Gitlab::Redis::SharedState.with do |redis| + redis.publish(NOTIFICATION_CHANNEL, jid) + "Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}" + end + end + + private + + def execute_job_cancel(jid) + Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid) + + thread = Gitlab::SidekiqMiddleware::JobsThreads + .interrupt(jid) + + if thread + Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}." + else + Sidekiq.logger.info "Did not find thread for #{jid}." + end + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb new file mode 100644 index 00000000000..58cae3e42e0 --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/jobs_threads_spec.rb @@ -0,0 +1,83 @@ +require 'spec_helper' + +describe Gitlab::SidekiqMiddleware::JobsThreads do + subject { described_class.new } + + let(:worker) { double(:worker, class: Chaos::SleepWorker) } + let(:jid) { '581f90fbd2f24deabcbde2f9' } + let(:job) { { 'jid' => jid } } + let(:jid_thread) { '684f90fbd2f24deabcbde2f9' } + let(:job_thread) { { 'jid' => jid_thread } } + let(:queue) { 'test_queue' } + let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } } + + def run_job + subject.call(worker, job, queue) do + sleep 2 + "mock return from yield" + end + end + + def run_job_thread + Thread.new do + subject.call(worker, job_thread, queue) do + sleep 3 + "mock return from yield" + end + end + end + + describe '.call' do + context 'by default' do + it 'return from yield' do + expect(run_job).to eq("mock return from yield") + end + end + + context 'when job is marked as cancelled' do + before do + mark_job_as_cancelled + end + + it 'return directly' do + expect(run_job).to be_nil + end + end + end + + describe '.self.interrupt' do + before do + run_job_thread + sleep 1 + end + + it 'interrupt the job with correct jid' do + expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt) + expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread]) + end + + it 'do nothing with wrong jid' do + expect(described_class.jobs[jid_thread]).not_to receive(:raise) + expect(described_class.interrupt 'wrong_jid').to be_nil + end + end + + describe '.self.cancelled?' do + it 'return true when job is marked as cancelled' do + mark_job_as_cancelled + expect(described_class.cancelled? jid).to be true + end + + it 'return false when job is not marked as cancelled' do + expect(described_class.cancelled? 'non-exists-jid').to be false + end + end + + describe '.self.mark_job_as_cancelled' do + it 'set Redis key' do + described_class.mark_job_as_cancelled('jid_123') + + expect(described_class.cancelled? 'jid_123').to be true + end + end +end |