diff options
author | Douwe Maan <douwe@selenight.nl> | 2017-11-29 16:30:17 +0100 |
---|---|---|
committer | Douwe Maan <douwe@selenight.nl> | 2017-12-05 11:59:39 +0100 |
commit | 1e6ca3c41ead23c5e433460c8c807ea73d9ec0ef (patch) | |
tree | ed6a5da0def848adc1a15f80e69d9c55651895a4 | |
parent | a5c3f1c8ff7da20183b172b2b0693a6010c9e86d (diff) | |
download | gitlab-ce-dm-application-worker.tar.gz |
Consistently schedule Sidekiq jobsdm-application-worker
23 files changed, 107 insertions, 100 deletions
diff --git a/app/models/group.rb b/app/models/group.rb index 27287f079a4..505e943e464 100644 --- a/app/models/group.rb +++ b/app/models/group.rb @@ -2,6 +2,7 @@ require 'carrierwave/orm/activerecord' class Group < Namespace include Gitlab::ConfigHelper + include AfterCommitQueue include AccessRequestable include Avatarable include Referable diff --git a/app/models/key.rb b/app/models/key.rb index 815fd1de909..a3f8a5d6dc7 100644 --- a/app/models/key.rb +++ b/app/models/key.rb @@ -2,6 +2,7 @@ require 'digest/md5' class Key < ActiveRecord::Base include Gitlab::CurrentSettings + include AfterCommitQueue include Sortable belongs_to :user diff --git a/app/models/member.rb b/app/models/member.rb index cbbd58f2eaf..2fe5fda985f 100644 --- a/app/models/member.rb +++ b/app/models/member.rb @@ -1,4 +1,5 @@ class Member < ActiveRecord::Base + include AfterCommitQueue include Sortable include Importable include Expirable diff --git a/app/models/service.rb b/app/models/service.rb index fdd2605e3e3..3c4f1885dd0 100644 --- a/app/models/service.rb +++ b/app/models/service.rb @@ -211,7 +211,7 @@ class Service < ActiveRecord::Base def async_execute(data) return unless supported_events.include?(data[:object_kind]) - Sidekiq::Client.enqueue(ProjectServiceWorker, id, data) + ProjectServiceWorker.perform_async(id, data) end def issue_tracker? diff --git a/app/models/user.rb b/app/models/user.rb index 76fd395be9a..38ee4ed50c1 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -7,6 +7,7 @@ class User < ActiveRecord::Base include Gitlab::ConfigHelper include Gitlab::CurrentSettings include Gitlab::SQL::Pattern + include AfterCommitQueue include Avatarable include Referable include Sortable @@ -903,6 +904,7 @@ class User < ActiveRecord::Base def post_destroy_hook log_info("User \"#{name}\" (#{email}) was removed") + system_hook_service.execute_hooks_for(self, :destroy) end diff --git a/app/services/system_hooks_service.rb b/app/services/system_hooks_service.rb index 911cc919bb8..690918b4a00 100644 --- a/app/services/system_hooks_service.rb +++ b/app/services/system_hooks_service.rb @@ -1,6 +1,10 @@ class SystemHooksService def execute_hooks_for(model, event) - execute_hooks(build_event_data(model, event)) + data = build_event_data(model, event) + + model.run_after_commit_or_now do + SystemHooksService.new.execute_hooks(data) + end end def execute_hooks(data, hooks_scope = :all) diff --git a/app/services/web_hook_service.rb b/app/services/web_hook_service.rb index cd99e0b90f9..6ebc7c89500 100644 --- a/app/services/web_hook_service.rb +++ b/app/services/web_hook_service.rb @@ -63,7 +63,7 @@ class WebHookService end def async_execute - Sidekiq::Client.enqueue(WebHookWorker, hook.id, data, hook_name) + WebHookWorker.perform_async(hook.id, data, hook_name) end private diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb index d4f334ec3b8..09559e3b696 100644 --- a/app/workers/authorized_projects_worker.rb +++ b/app/workers/authorized_projects_worker.rb @@ -16,11 +16,6 @@ class AuthorizedProjectsWorker waiter.wait end - # Schedules multiple jobs to run in sidekiq without waiting for completion - def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) - end - # Performs multiple jobs directly. Failed jobs will be put into sidekiq so # they can benefit from retries def self.bulk_perform_inline(args_list) diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index 65791c4eaba..aeb3bc019b9 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -1,34 +1,6 @@ class BackgroundMigrationWorker include ApplicationWorker - # Enqueues a number of jobs in bulk. - # - # The `jobs` argument should be an Array of Arrays, each sub-array must be in - # the form: - # - # [migration-class, [arg1, arg2, ...]] - def self.perform_bulk(jobs) - Sidekiq::Client.push_bulk('class' => self, - 'queue' => sidekiq_options['queue'], - 'args' => jobs) - end - - # Schedules multiple jobs in bulk, with a delay. - # - def self.perform_bulk_in(delay, jobs) - now = Time.now.to_i - schedule = now + delay.to_i - - if schedule <= now - raise ArgumentError, 'The schedule time must be in the future!' - end - - Sidekiq::Client.push_bulk('class' => self, - 'queue' => sidekiq_options['queue'], - 'args' => jobs, - 'at' => schedule) - end - # Performs the background migration. # # See Gitlab::BackgroundMigration.perform for more information. diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index bf1ecaa0c6d..9c3bdabc49e 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -21,5 +21,20 @@ module ApplicationWorker def queue get_sidekiq_options['queue'].to_s end + + def bulk_perform_async(args_list) + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + end + + def bulk_perform_in(delay, args_list) + now = Time.now.to_i + schedule = now + delay.to_i + + if schedule <= now + raise ArgumentError, 'The schedule time must be in the future!' + end + + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) + end end end diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb index 73ab4211080..87e5dca01fd 100644 --- a/app/workers/expire_build_artifacts_worker.rb +++ b/app/workers/expire_build_artifacts_worker.rb @@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker build_ids = Ci::Build.with_expired_artifacts.pluck(:id) build_ids = build_ids.map { |build_id| [build_id] } - Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids ) + ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids) end end diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb index 13d750e5876..adb25c2a170 100644 --- a/app/workers/namespaceless_project_destroy_worker.rb +++ b/app/workers/namespaceless_project_destroy_worker.rb @@ -8,10 +8,6 @@ class NamespacelessProjectDestroyWorker include ApplicationWorker include ExceptionBacktrace - def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) - end - def perform(project_id) begin project = Project.unscoped.find(project_id) diff --git a/config/initializers/forbid_sidekiq_in_transactions.rb b/config/initializers/forbid_sidekiq_in_transactions.rb index a78711fe599..bedd57ede04 100644 --- a/config/initializers/forbid_sidekiq_in_transactions.rb +++ b/config/initializers/forbid_sidekiq_in_transactions.rb @@ -13,20 +13,19 @@ module Sidekiq module ClassMethods module NoSchedulingFromTransactions - NESTING = ::Rails.env.test? ? 1 : 0 - %i(perform_async perform_at perform_in).each do |name| define_method(name) do |*args| - return super(*args) if Sidekiq::Worker.skip_transaction_check - return super(*args) unless ActiveRecord::Base.connection.open_transactions > NESTING + if !Sidekiq::Worker.skip_transaction_check && AfterCommitQueue.inside_transaction? + raise <<-MSG.strip_heredoc + `#{self}.#{name}` cannot be called inside a transaction as this can lead to + race conditions when the worker runs before the transaction is committed and + tries to access a model that has not been saved yet. - raise <<-MSG.strip_heredoc - `#{self}.#{name}` cannot be called inside a transaction as this can lead to - race conditions when the worker runs before the transaction is committed and - tries to access a model that has not been saved yet. + Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead. + MSG + end - Use an `after_commit` hook, or include `AfterCommitQueue` and use a `run_after_commit` block instead. - MSG + super(*args) end end end diff --git a/db/post_migrate/20170627101016_schedule_event_migrations.rb b/db/post_migrate/20170627101016_schedule_event_migrations.rb index 1f34375ff0d..1e020d05f78 100644 --- a/db/post_migrate/20170627101016_schedule_event_migrations.rb +++ b/db/post_migrate/20170627101016_schedule_event_migrations.rb @@ -25,14 +25,14 @@ class ScheduleEventMigrations < ActiveRecord::Migration # We push multiple jobs at a time to reduce the time spent in # Sidekiq/Redis operations. We're using this buffer based approach so we # don't need to run additional queries for every range. - BackgroundMigrationWorker.perform_bulk(jobs) + BackgroundMigrationWorker.bulk_perform_async(jobs) jobs.clear end jobs << ['MigrateEventsToPushEventPayloads', [min, max]] end - BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? + BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty? end def down diff --git a/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb b/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb index 01d56fbd490..467c584c2e0 100644 --- a/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb +++ b/db/post_migrate/20171005130944_schedule_create_gpg_key_subkeys_from_gpg_keys.rb @@ -19,7 +19,7 @@ class ScheduleCreateGpgKeySubkeysFromGpgKeys < ActiveRecord::Migration [MIGRATION, [id]] end - BackgroundMigrationWorker.perform_bulk(jobs) + BackgroundMigrationWorker.bulk_perform_async(jobs) end end diff --git a/doc/development/background_migrations.md b/doc/development/background_migrations.md index 5452b0e7a2f..fd2b9d0e908 100644 --- a/doc/development/background_migrations.md +++ b/doc/development/background_migrations.md @@ -68,10 +68,10 @@ BackgroundMigrationWorker.perform_async('BackgroundMigrationClassName', [arg1, a ``` Usually it's better to enqueue jobs in bulk, for this you can use -`BackgroundMigrationWorker.perform_bulk`: +`BackgroundMigrationWorker.bulk_perform_async`: ```ruby -BackgroundMigrationWorker.perform_bulk( +BackgroundMigrationWorker.bulk_perform_async( [['BackgroundMigrationClassName', [1]], ['BackgroundMigrationClassName', [2]]] ) @@ -85,13 +85,13 @@ updates. Removals in turn can be handled by simply defining foreign keys with cascading deletes. If you would like to schedule jobs in bulk with a delay, you can use -`BackgroundMigrationWorker.perform_bulk_in`: +`BackgroundMigrationWorker.bulk_perform_in`: ```ruby jobs = [['BackgroundMigrationClassName', [1]], ['BackgroundMigrationClassName', [2]]] -BackgroundMigrationWorker.perform_bulk_in(5.minutes, jobs) +BackgroundMigrationWorker.bulk_perform_in(5.minutes, jobs) ``` ## Cleaning Up @@ -201,7 +201,7 @@ class ScheduleExtractServicesUrl < ActiveRecord::Migration ['ExtractServicesUrl', [id]] end - BackgroundMigrationWorker.perform_bulk(jobs) + BackgroundMigrationWorker.bulk_perform_async(jobs) end end diff --git a/lib/after_commit_queue.rb b/lib/after_commit_queue.rb index 4750a2c373a..db63c5038ae 100644 --- a/lib/after_commit_queue.rb +++ b/lib/after_commit_queue.rb @@ -6,12 +6,34 @@ module AfterCommitQueue after_rollback :_clear_after_commit_queue end - def run_after_commit(method = nil, &block) - _after_commit_queue << proc { self.send(method) } if method # rubocop:disable GitlabSecurity/PublicSend + def run_after_commit(&block) _after_commit_queue << block if block + + true + end + + def run_after_commit_or_now(&block) + if AfterCommitQueue.inside_transaction? + run_after_commit(&block) + else + instance_eval(&block) + end + true end + def self.open_transactions_baseline + if ::Rails.env.test? + return DatabaseCleaner.connections.count { |conn| conn.strategy.is_a?(DatabaseCleaner::ActiveRecord::Transaction) } + end + + 0 + end + + def self.inside_transaction? + ActiveRecord::Base.connection.open_transactions > open_transactions_baseline + end + protected def _run_after_commit_queue diff --git a/lib/gitlab/database/migration_helpers.rb b/lib/gitlab/database/migration_helpers.rb index c276c3566b4..3f65bc912de 100644 --- a/lib/gitlab/database/migration_helpers.rb +++ b/lib/gitlab/database/migration_helpers.rb @@ -703,14 +703,14 @@ into similar problems in the future (e.g. when new tables are created). # We push multiple jobs at a time to reduce the time spent in # Sidekiq/Redis operations. We're using this buffer based approach so we # don't need to run additional queries for every range. - BackgroundMigrationWorker.perform_bulk(jobs) + BackgroundMigrationWorker.bulk_perform_async(jobs) jobs.clear end jobs << [job_class_name, [start_id, end_id]] end - BackgroundMigrationWorker.perform_bulk(jobs) unless jobs.empty? + BackgroundMigrationWorker.bulk_perform_async(jobs) unless jobs.empty? end # Queues background migration jobs for an entire table, batched by ID range. diff --git a/spec/lib/gitlab/database/migration_helpers_spec.rb b/spec/lib/gitlab/database/migration_helpers_spec.rb index 3c8350b3aad..664ba0f7234 100644 --- a/spec/lib/gitlab/database/migration_helpers_spec.rb +++ b/spec/lib/gitlab/database/migration_helpers_spec.rb @@ -942,8 +942,8 @@ describe Gitlab::Database::MigrationHelpers do end it 'queues jobs in groups of buffer size 1' do - expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]]]) - expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id3, id3]]]) + expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]]]) + expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id3, id3]]]) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) end @@ -960,8 +960,8 @@ describe Gitlab::Database::MigrationHelpers do end it 'queues jobs in bulk all at once (big buffer size)' do - expect(BackgroundMigrationWorker).to receive(:perform_bulk).with([['FooJob', [id1, id2]], - ['FooJob', [id3, id3]]]) + expect(BackgroundMigrationWorker).to receive(:bulk_perform_async).with([['FooJob', [id1, id2]], + ['FooJob', [id3, id3]]]) model.bulk_queue_background_migration_jobs_by_range(User, 'FooJob', batch_size: 2) end diff --git a/spec/services/web_hook_service_spec.rb b/spec/services/web_hook_service_spec.rb index a669429ce3e..21910e69d2e 100644 --- a/spec/services/web_hook_service_spec.rb +++ b/spec/services/web_hook_service_spec.rb @@ -146,7 +146,7 @@ describe WebHookService do let(:system_hook) { create(:system_hook) } it 'enqueue WebHookWorker' do - expect(Sidekiq::Client).to receive(:enqueue).with(WebHookWorker, project_hook.id, data, 'push_hooks') + expect(WebHookWorker).to receive(:perform_async).with(project_hook.id, data, 'push_hooks') described_class.new(project_hook, data, 'push_hooks').async_execute end diff --git a/spec/workers/authorized_projects_worker_spec.rb b/spec/workers/authorized_projects_worker_spec.rb index 90ed1309d4a..0d6eb536c33 100644 --- a/spec/workers/authorized_projects_worker_spec.rb +++ b/spec/workers/authorized_projects_worker_spec.rb @@ -65,7 +65,6 @@ describe AuthorizedProjectsWorker do args_list = build_args_list(project.owner.id) push_bulk_args = { 'class' => described_class, - 'queue' => described_class.sidekiq_options['queue'], 'args' => args_list } diff --git a/spec/workers/background_migration_worker_spec.rb b/spec/workers/background_migration_worker_spec.rb index 4f6e3474634..1c54cf55fa0 100644 --- a/spec/workers/background_migration_worker_spec.rb +++ b/spec/workers/background_migration_worker_spec.rb @@ -10,35 +10,4 @@ describe BackgroundMigrationWorker, :sidekiq do described_class.new.perform('Foo', [10, 20]) end end - - describe '.perform_bulk' do - it 'enqueues background migrations in bulk' do - Sidekiq::Testing.fake! do - described_class.perform_bulk([['Foo', [1]], ['Foo', [2]]]) - - expect(described_class.jobs.count).to eq 2 - expect(described_class.jobs).to all(include('enqueued_at')) - end - end - end - - describe '.perform_bulk_in' do - context 'when delay is valid' do - it 'correctly schedules background migrations' do - Sidekiq::Testing.fake! do - described_class.perform_bulk_in(1.minute, [['Foo', [1]], ['Foo', [2]]]) - - expect(described_class.jobs.count).to eq 2 - expect(described_class.jobs).to all(include('at')) - end - end - end - - context 'when delay is invalid' do - it 'raises an ArgumentError exception' do - expect { described_class.perform_bulk_in(-60, [['Foo']]) } - .to raise_error(ArgumentError) - end - end - end end diff --git a/spec/workers/concerns/application_worker_spec.rb b/spec/workers/concerns/application_worker_spec.rb index fd434f58602..0145563e0ed 100644 --- a/spec/workers/concerns/application_worker_spec.rb +++ b/spec/workers/concerns/application_worker_spec.rb @@ -24,4 +24,35 @@ describe ApplicationWorker do expect(worker.queue).to eq('some_queue') end end + + describe '.bulk_perform_async' do + it 'enqueues jobs in bulk' do + Sidekiq::Testing.fake! do + worker.bulk_perform_async([['Foo', [1]], ['Foo', [2]]]) + + expect(worker.jobs.count).to eq 2 + expect(worker.jobs).to all(include('enqueued_at')) + end + end + end + + describe '.bulk_perform_in' do + context 'when delay is valid' do + it 'correctly schedules jobs' do + Sidekiq::Testing.fake! do + worker.bulk_perform_in(1.minute, [['Foo', [1]], ['Foo', [2]]]) + + expect(worker.jobs.count).to eq 2 + expect(worker.jobs).to all(include('at')) + end + end + end + + context 'when delay is invalid' do + it 'raises an ArgumentError exception' do + expect { worker.bulk_perform_in(-60, [['Foo']]) } + .to raise_error(ArgumentError) + end + end + end end |