diff options
-rw-r--r-- | config/initializers/sidekiq.rb | 2 | ||||
-rw-r--r-- | doc/administration/troubleshooting/sidekiq.md | 2 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_daemon/monitor.rb | 184 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_middleware/monitor.rb | 4 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_monitor.rb | 182 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb (renamed from spec/lib/gitlab/sidekiq_monitor_spec.rb) | 4 | ||||
-rw-r--r-- | spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb | 6 |
7 files changed, 193 insertions, 191 deletions
diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 9f3e104bc2b..20f31ff6810 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -60,7 +60,7 @@ Sidekiq.configure_server do |config| # Sidekiq (e.g. in an initializer). ActiveRecord::Base.clear_all_connections! - Gitlab::SidekiqMonitor.instance.start if enable_sidekiq_monitor + Gitlab::SidekiqDaemon::Monitor.instance.start if enable_sidekiq_monitor end if enable_reliable_fetch? diff --git a/doc/administration/troubleshooting/sidekiq.md b/doc/administration/troubleshooting/sidekiq.md index c41edb5dbfc..fdafac8420e 100644 --- a/doc/administration/troubleshooting/sidekiq.md +++ b/doc/administration/troubleshooting/sidekiq.md @@ -270,7 +270,7 @@ is interrupted mid-execution and it is not guaranteed that proper rollback of transactions is implemented. ```ruby -Gitlab::SidekiqMonitor.cancel_job('job-id') +Gitlab::SidekiqDaemon::Monitor.cancel_job('job-id') ``` > This requires the Sidekiq to be run with `SIDEKIQ_MONITOR_WORKER=1` diff --git a/lib/gitlab/sidekiq_daemon/monitor.rb b/lib/gitlab/sidekiq_daemon/monitor.rb new file mode 100644 index 00000000000..bbfca130425 --- /dev/null +++ b/lib/gitlab/sidekiq_daemon/monitor.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqDaemon + class Monitor < Daemon + include ::Gitlab::Utils::StrongMemoize + + NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications' + CANCEL_DEADLINE = 24.hours.seconds + RECONNECT_TIME = 3.seconds + + # We use exception derived from `Exception` + # to consider this as an very low-level exception + # that should not be caught by application + CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException + + attr_reader :jobs_thread + attr_reader :jobs_mutex + + def initialize + super + + @jobs_thread = {} + @jobs_mutex = Mutex.new + end + + def within_job(jid, queue) + jobs_mutex.synchronize do + jobs_thread[jid] = Thread.current + end + + if cancelled?(jid) + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'run', + queue: queue, + jid: jid, + canceled: true + ) + raise CancelledError + end + + yield + ensure + jobs_mutex.synchronize do + jobs_thread.delete(jid) + end + end + + def self.cancel_job(jid) + payload = { + action: 'cancel', + jid: jid + }.to_json + + ::Gitlab::Redis::SharedState.with do |redis| + redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) + redis.publish(NOTIFICATION_CHANNEL, payload) + end + end + + private + + def start_working + Sidekiq.logger.info( + class: self.class.to_s, + action: 'start', + message: 'Starting Monitor Daemon' + ) + + while enabled? + process_messages + sleep(RECONNECT_TIME) + end + + ensure + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'stop', + message: 'Stopping Monitor Daemon' + ) + end + + def stop_working + thread.raise(Interrupt) if thread.alive? + end + + def process_messages + ::Gitlab::Redis::SharedState.with do |redis| + redis.subscribe(NOTIFICATION_CHANNEL) do |on| + on.message do |channel, message| + process_message(message) + end + end + end + rescue Exception => e # rubocop:disable Lint/RescueException + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'exception', + message: e.message + ) + + # we re-raise system exceptions + raise e unless e.is_a?(StandardError) + end + + def process_message(message) + Sidekiq.logger.info( + class: self.class.to_s, + channel: NOTIFICATION_CHANNEL, + message: 'Received payload on channel', + payload: message + ) + + message = safe_parse(message) + return unless message + + case message['action'] + when 'cancel' + process_job_cancel(message['jid']) + else + # unknown message + end + end + + def safe_parse(message) + JSON.parse(message) + rescue JSON::ParserError + end + + def process_job_cancel(jid) + return unless jid + + # try to find thread without lock + return unless find_thread_unsafe(jid) + + Thread.new do + # try to find a thread, but with guaranteed + # that handle for thread corresponds to actually + # running job + find_thread_with_lock(jid) do |thread| + Sidekiq.logger.warn( + class: self.class.to_s, + action: 'cancel', + message: 'Canceling thread with CancelledError', + jid: jid, + thread_id: thread.object_id + ) + + thread&.raise(CancelledError) + end + end + end + + # This method needs to be thread-safe + # This is why it passes thread in block, + # to ensure that we do process this thread + def find_thread_unsafe(jid) + jobs_thread[jid] + end + + def find_thread_with_lock(jid) + # don't try to lock if we cannot find the thread + return unless find_thread_unsafe(jid) + + jobs_mutex.synchronize do + find_thread_unsafe(jid).tap do |thread| + yield(thread) if thread + end + end + end + + def cancelled?(jid) + ::Gitlab::Redis::SharedState.with do |redis| + redis.exists(self.class.cancel_job_key(jid)) + end + end + + def self.cancel_job_key(jid) + "sidekiq:cancel:#{jid}" + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/monitor.rb b/lib/gitlab/sidekiq_middleware/monitor.rb index 53a6132edac..00965bf5506 100644 --- a/lib/gitlab/sidekiq_middleware/monitor.rb +++ b/lib/gitlab/sidekiq_middleware/monitor.rb @@ -4,10 +4,10 @@ module Gitlab module SidekiqMiddleware class Monitor def call(worker, job, queue) - Gitlab::SidekiqMonitor.instance.within_job(job['jid'], queue) do + Gitlab::SidekiqDaemon::Monitor.instance.within_job(job['jid'], queue) do yield end - rescue Gitlab::SidekiqMonitor::CancelledError + rescue Gitlab::SidekiqDaemon::Monitor::CancelledError # push job to DeadSet payload = ::Sidekiq.dump_json(job) ::Sidekiq::DeadSet.new.kill(payload, notify_failure: false) diff --git a/lib/gitlab/sidekiq_monitor.rb b/lib/gitlab/sidekiq_monitor.rb deleted file mode 100644 index a58b33534bf..00000000000 --- a/lib/gitlab/sidekiq_monitor.rb +++ /dev/null @@ -1,182 +0,0 @@ -# frozen_string_literal: true - -module Gitlab - class SidekiqMonitor < Daemon - include ::Gitlab::Utils::StrongMemoize - - NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications' - CANCEL_DEADLINE = 24.hours.seconds - RECONNECT_TIME = 3.seconds - - # We use exception derived from `Exception` - # to consider this as an very low-level exception - # that should not be caught by application - CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException - - attr_reader :jobs_thread - attr_reader :jobs_mutex - - def initialize - super - - @jobs_thread = {} - @jobs_mutex = Mutex.new - end - - def within_job(jid, queue) - jobs_mutex.synchronize do - jobs_thread[jid] = Thread.current - end - - if cancelled?(jid) - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'run', - queue: queue, - jid: jid, - canceled: true - ) - raise CancelledError - end - - yield - ensure - jobs_mutex.synchronize do - jobs_thread.delete(jid) - end - end - - def self.cancel_job(jid) - payload = { - action: 'cancel', - jid: jid - }.to_json - - ::Gitlab::Redis::SharedState.with do |redis| - redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1) - redis.publish(NOTIFICATION_CHANNEL, payload) - end - end - - private - - def start_working - Sidekiq.logger.info( - class: self.class.to_s, - action: 'start', - message: 'Starting Monitor Daemon' - ) - - while enabled? - process_messages - sleep(RECONNECT_TIME) - end - - ensure - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'stop', - message: 'Stopping Monitor Daemon' - ) - end - - def stop_working - thread.raise(Interrupt) if thread.alive? - end - - def process_messages - ::Gitlab::Redis::SharedState.with do |redis| - redis.subscribe(NOTIFICATION_CHANNEL) do |on| - on.message do |channel, message| - process_message(message) - end - end - end - rescue Exception => e # rubocop:disable Lint/RescueException - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'exception', - message: e.message - ) - - # we re-raise system exceptions - raise e unless e.is_a?(StandardError) - end - - def process_message(message) - Sidekiq.logger.info( - class: self.class.to_s, - channel: NOTIFICATION_CHANNEL, - message: 'Received payload on channel', - payload: message - ) - - message = safe_parse(message) - return unless message - - case message['action'] - when 'cancel' - process_job_cancel(message['jid']) - else - # unknown message - end - end - - def safe_parse(message) - JSON.parse(message) - rescue JSON::ParserError - end - - def process_job_cancel(jid) - return unless jid - - # try to find thread without lock - return unless find_thread_unsafe(jid) - - Thread.new do - # try to find a thread, but with guaranteed - # that handle for thread corresponds to actually - # running job - find_thread_with_lock(jid) do |thread| - Sidekiq.logger.warn( - class: self.class.to_s, - action: 'cancel', - message: 'Canceling thread with CancelledError', - jid: jid, - thread_id: thread.object_id - ) - - thread&.raise(CancelledError) - end - end - end - - # This method needs to be thread-safe - # This is why it passes thread in block, - # to ensure that we do process this thread - def find_thread_unsafe(jid) - jobs_thread[jid] - end - - def find_thread_with_lock(jid) - # don't try to lock if we cannot find the thread - return unless find_thread_unsafe(jid) - - jobs_mutex.synchronize do - find_thread_unsafe(jid).tap do |thread| - yield(thread) if thread - end - end - end - - def cancelled?(jid) - ::Gitlab::Redis::SharedState.with do |redis| - redis.exists(self.class.cancel_job_key(jid)) - end - end - - def self.cancel_job_key(jid) - "sidekiq:cancel:#{jid}" - end - end -end diff --git a/spec/lib/gitlab/sidekiq_monitor_spec.rb b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb index bbd7bf90217..acbb09e3542 100644 --- a/spec/lib/gitlab/sidekiq_monitor_spec.rb +++ b/spec/lib/gitlab/sidekiq_daemon/monitor_spec.rb @@ -2,7 +2,7 @@ require 'spec_helper' -describe Gitlab::SidekiqMonitor do +describe Gitlab::SidekiqDaemon::Monitor do let(:monitor) { described_class.new } describe '#within_job' do @@ -43,7 +43,7 @@ describe Gitlab::SidekiqMonitor do before do # we want to run at most once cycle # we toggle `enabled?` flag after the first call - stub_const('Gitlab::SidekiqMonitor::RECONNECT_TIME', 0) + stub_const('Gitlab::SidekiqDaemon::Monitor::RECONNECT_TIME', 0) allow(monitor).to receive(:enabled?).and_return(true, false) allow(Sidekiq.logger).to receive(:info) diff --git a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb index 7319cdc2399..023df1a6391 100644 --- a/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/monitor_spec.rb @@ -10,8 +10,8 @@ describe Gitlab::SidekiqMiddleware::Monitor do let(:job) { { 'jid' => 'job-id' } } let(:queue) { 'my-queue' } - it 'calls SidekiqMonitor' do - expect(Gitlab::SidekiqMonitor.instance).to receive(:within_job) + it 'calls Gitlab::SidekiqDaemon::Monitor' do + expect(Gitlab::SidekiqDaemon::Monitor.instance).to receive(:within_job) .with('job-id', 'my-queue') .and_call_original @@ -29,7 +29,7 @@ describe Gitlab::SidekiqMiddleware::Monitor do context 'when cancel happens' do subject do monitor.call(worker, job, queue) do - raise Gitlab::SidekiqMonitor::CancelledError + raise Gitlab::SidekiqDaemon::Monitor::CancelledError end end |