diff options
| author | Kamil Trzciński <ayufan@ayufan.eu> | 2018-02-28 20:36:55 +0100 |
|---|---|---|
| committer | Kamil Trzciński <ayufan@ayufan.eu> | 2018-02-28 20:36:55 +0100 |
| commit | 965dc28691e2d70b7040e28d90ccbc3721a9e416 (patch) | |
| tree | 84258f35b72f2e7ce6a7198db66032df4ad5aadb /app/workers | |
| parent | e3fafa7632e038927085cf8c8228c93be44b36bd (diff) | |
| parent | 7fabc892f251740dbd9a4755baede662e6854870 (diff) | |
| download | gitlab-ce-965dc28691e2d70b7040e28d90ccbc3721a9e416.tar.gz | |
Merge commit '7fabc892f251740dbd9a4755baede662e6854870' into object-storage-ee-to-ce-backport
Diffstat (limited to 'app/workers')
96 files changed, 807 insertions, 190 deletions
diff --git a/app/workers/admin_email_worker.rb b/app/workers/admin_email_worker.rb index c2dc955b27c..bec0a003a1c 100644 --- a/app/workers/admin_email_worker.rb +++ b/app/workers/admin_email_worker.rb @@ -1,5 +1,5 @@ class AdminEmailWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb index 55d8d0c69d1..09559e3b696 100644 --- a/app/workers/authorized_projects_worker.rb +++ b/app/workers/authorized_projects_worker.rb @@ -1,6 +1,5 @@ class AuthorizedProjectsWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker # Schedules multiple jobs and waits for them to be completed. def self.bulk_perform_and_wait(args_list) @@ -17,11 +16,6 @@ class AuthorizedProjectsWorker waiter.wait end - # Schedules multiple jobs to run in sidekiq without waiting for completion - def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) - end - # Performs multiple jobs directly. Failed jobs will be put into sidekiq so # they can benefit from retries def self.bulk_perform_inline(args_list) diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index 45ce49bb5c0..aeb3bc019b9 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -1,34 +1,5 @@ class BackgroundMigrationWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue - - # Enqueues a number of jobs in bulk. - # - # The `jobs` argument should be an Array of Arrays, each sub-array must be in - # the form: - # - # [migration-class, [arg1, arg2, ...]] - def self.perform_bulk(jobs) - Sidekiq::Client.push_bulk('class' => self, - 'queue' => sidekiq_options['queue'], - 'args' => jobs) - end - - # Schedules multiple jobs in bulk, with a delay. - # - def self.perform_bulk_in(delay, jobs) - now = Time.now.to_i - schedule = now + delay.to_i - - if schedule <= now - raise ArgumentError, 'The schedule time must be in the future!' - end - - Sidekiq::Client.push_bulk('class' => self, - 'queue' => sidekiq_options['queue'], - 'args' => jobs, - 'at' => schedule) - end + include ApplicationWorker # Performs the background migration. # diff --git a/app/workers/build_coverage_worker.rb b/app/workers/build_coverage_worker.rb index cd4af85d047..62b212c79be 100644 --- a/app/workers/build_coverage_worker.rb +++ b/app/workers/build_coverage_worker.rb @@ -1,5 +1,5 @@ class BuildCoverageWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue def perform(build_id) diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 52e7d346e74..5efa9180f5e 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -1,5 +1,5 @@ class BuildFinishedWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index dedaf2835e6..6705a1c2709 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -1,5 +1,5 @@ class BuildHooksWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :hooks diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index e5ceb9ef715..fc775a84dc0 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -1,5 +1,5 @@ class BuildQueueWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index 20ec24bd18a..ec049821ad7 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -1,5 +1,5 @@ class BuildSuccessWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/build_trace_sections_worker.rb b/app/workers/build_trace_sections_worker.rb index 8c57e8f767b..c0f5c144e10 100644 --- a/app/workers/build_trace_sections_worker.rb +++ b/app/workers/build_trace_sections_worker.rb @@ -1,5 +1,5 @@ class BuildTraceSectionsWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue def perform(build_id) diff --git a/app/workers/cluster_install_app_worker.rb b/app/workers/cluster_install_app_worker.rb index 899aed904e4..f771cb4939f 100644 --- a/app/workers/cluster_install_app_worker.rb +++ b/app/workers/cluster_install_app_worker.rb @@ -1,5 +1,5 @@ class ClusterInstallAppWorker - include Sidekiq::Worker + include ApplicationWorker include ClusterQueue include ClusterApplications diff --git a/app/workers/cluster_provision_worker.rb b/app/workers/cluster_provision_worker.rb index b01f9708424..1ab4de3b647 100644 --- a/app/workers/cluster_provision_worker.rb +++ b/app/workers/cluster_provision_worker.rb @@ -1,5 +1,5 @@ class ClusterProvisionWorker - include Sidekiq::Worker + include ApplicationWorker include ClusterQueue def perform(cluster_id) diff --git a/app/workers/cluster_wait_for_app_installation_worker.rb b/app/workers/cluster_wait_for_app_installation_worker.rb index 4bb8c293e5d..d564d5e48bf 100644 --- a/app/workers/cluster_wait_for_app_installation_worker.rb +++ b/app/workers/cluster_wait_for_app_installation_worker.rb @@ -1,5 +1,5 @@ class ClusterWaitForAppInstallationWorker - include Sidekiq::Worker + include ApplicationWorker include ClusterQueue include ClusterApplications diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb new file mode 100644 index 00000000000..9c3bdabc49e --- /dev/null +++ b/app/workers/concerns/application_worker.rb @@ -0,0 +1,40 @@ +Sidekiq::Worker.extend ActiveSupport::Concern + +module ApplicationWorker + extend ActiveSupport::Concern + + include Sidekiq::Worker + + included do + sidekiq_options queue: base_queue_name + end + + module ClassMethods + def base_queue_name + name + .sub(/\AGitlab::/, '') + .sub(/Worker\z/, '') + .underscore + .tr('/', '_') + end + + def queue + get_sidekiq_options['queue'].to_s + end + + def bulk_perform_async(args_list) + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + end + + def bulk_perform_in(delay, args_list) + now = Time.now.to_i + schedule = now + delay.to_i + + if schedule <= now + raise ArgumentError, 'The schedule time must be in the future!' + end + + Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) + end + end +end diff --git a/app/workers/concerns/dedicated_sidekiq_queue.rb b/app/workers/concerns/dedicated_sidekiq_queue.rb deleted file mode 100644 index 132bae6022b..00000000000 --- a/app/workers/concerns/dedicated_sidekiq_queue.rb +++ /dev/null @@ -1,9 +0,0 @@ -# Concern that sets the queue of a Sidekiq worker based on the worker's class -# name/namespace. -module DedicatedSidekiqQueue - extend ActiveSupport::Concern - - included do - sidekiq_options queue: name.sub(/Worker\z/, '').underscore.tr('/', '_') - end -end diff --git a/app/workers/concerns/gitlab/github_import/notify_upon_death.rb b/app/workers/concerns/gitlab/github_import/notify_upon_death.rb new file mode 100644 index 00000000000..3d7120665b6 --- /dev/null +++ b/app/workers/concerns/gitlab/github_import/notify_upon_death.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + # NotifyUponDeath can be included into a GitHub worker class if it should + # notify any JobWaiter instances upon being moved to the Sidekiq dead queue. + # + # Note that this will only notify the waiter upon graceful termination, a + # SIGKILL will still result in the waiter _not_ being notified. + # + # Workers including this module must have jobs passed where the last + # argument is the key to notify, as a String. + module NotifyUponDeath + extend ActiveSupport::Concern + + included do + # If a job is being exhausted we still want to notify the + # AdvanceStageWorker. This prevents the entire import from getting stuck + # just because 1 job threw too many errors. + sidekiq_retries_exhausted do |job| + args = job['args'] + jid = job['jid'] + + if args.length == 3 && (key = args.last) && key.is_a?(String) + JobWaiter.notify(key, jid) + end + end + end + end + end +end diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb new file mode 100644 index 00000000000..9a9fbaad653 --- /dev/null +++ b/app/workers/concerns/gitlab/github_import/object_importer.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + # ObjectImporter defines the base behaviour for every Sidekiq worker that + # imports a single resource such as a note or pull request. + module ObjectImporter + extend ActiveSupport::Concern + + included do + include ApplicationWorker + include GithubImport::Queue + include ReschedulingMethods + include NotifyUponDeath + end + + # project - An instance of `Project` to import the data into. + # client - An instance of `Gitlab::GithubImport::Client` + # hash - A Hash containing the details of the object to import. + def import(project, client, hash) + object = representation_class.from_json_hash(hash) + + importer_class.new(object, project, client).execute + + counter.increment(project: project.path_with_namespace) + end + + def counter + @counter ||= Gitlab::Metrics.counter(counter_name, counter_description) + end + + # Returns the representation class to use for the object. This class must + # define the class method `from_json_hash`. + def representation_class + raise NotImplementedError + end + + # Returns the class to use for importing the object. + def importer_class + raise NotImplementedError + end + + # Returns the name (as a Symbol) of the Prometheus counter. + def counter_name + raise NotImplementedError + end + + # Returns the description (as a String) of the Prometheus counter. + def counter_description + raise NotImplementedError + end + end + end +end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb new file mode 100644 index 00000000000..a2bee361b86 --- /dev/null +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -0,0 +1,16 @@ +module Gitlab + module GithubImport + module Queue + extend ActiveSupport::Concern + + included do + # If a job produces an error it may block a stage from advancing + # forever. To prevent this from happening we prevent jobs from going to + # the dead queue. This does mean some resources may not be imported, but + # this is better than a project being stuck in the "import" state + # forever. + sidekiq_options queue: 'github_importer', dead: false, retry: 5 + end + end + end +end diff --git a/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb new file mode 100644 index 00000000000..692ca6b7f42 --- /dev/null +++ b/app/workers/concerns/gitlab/github_import/rescheduling_methods.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + # Module that provides methods shared by the various workers used for + # importing GitHub projects. + module ReschedulingMethods + # project_id - The ID of the GitLab project to import the note into. + # hash - A Hash containing the details of the GitHub object to imoprt. + # notify_key - The Redis key to notify upon completion, if any. + def perform(project_id, hash, notify_key = nil) + project = Project.find_by(id: project_id) + + return notify_waiter(notify_key) unless project + + client = GithubImport.new_client_for(project, parallel: true) + + if try_import(project, client, hash) + notify_waiter(notify_key) + else + # In the event of hitting the rate limit we want to reschedule the job + # so its retried after our rate limit has been reset. + self.class + .perform_in(client.rate_limit_resets_in, project.id, hash, notify_key) + end + end + + def try_import(*args) + import(*args) + true + rescue RateLimitError + false + end + + def notify_waiter(key = nil) + JobWaiter.notify(key, jid) if key + end + end + end +end diff --git a/app/workers/concerns/gitlab/github_import/stage_methods.rb b/app/workers/concerns/gitlab/github_import/stage_methods.rb new file mode 100644 index 00000000000..147c8c8d683 --- /dev/null +++ b/app/workers/concerns/gitlab/github_import/stage_methods.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module StageMethods + # project_id - The ID of the GitLab project to import the data into. + def perform(project_id) + return unless (project = find_project(project_id)) + + client = GithubImport.new_client_for(project) + + try_import(client, project) + end + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def try_import(client, project) + import(client, project) + rescue RateLimitError + self.class.perform_in(client.rate_limit_resets_in, project.id) + end + + def find_project(id) + # If the project has been marked as failed we want to bail out + # automatically. + Project.import_started.find_by(id: id) + end + end + end +end diff --git a/app/workers/create_gpg_signature_worker.rb b/app/workers/create_gpg_signature_worker.rb index 9b5ff17aafa..f371731f68c 100644 --- a/app/workers/create_gpg_signature_worker.rb +++ b/app/workers/create_gpg_signature_worker.rb @@ -1,6 +1,5 @@ class CreateGpgSignatureWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(commit_sha, project_id) project = Project.find_by(id: project_id) diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb new file mode 100644 index 00000000000..00cd7b85b9f --- /dev/null +++ b/app/workers/create_pipeline_worker.rb @@ -0,0 +1,16 @@ +class CreatePipelineWorker + include ApplicationWorker + include PipelineQueue + + enqueue_in group: :creation + + def perform(project_id, user_id, ref, source, params = {}) + project = Project.find(project_id) + user = User.find(user_id) + params = params.deep_symbolize_keys + + Ci::CreatePipelineService + .new(project, user, ref: ref) + .execute(source, **params) + end +end diff --git a/app/workers/delete_merged_branches_worker.rb b/app/workers/delete_merged_branches_worker.rb index f870da4ecfd..07cd1f02fb5 100644 --- a/app/workers/delete_merged_branches_worker.rb +++ b/app/workers/delete_merged_branches_worker.rb @@ -1,6 +1,5 @@ class DeleteMergedBranchesWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(project_id, user_id) begin diff --git a/app/workers/delete_user_worker.rb b/app/workers/delete_user_worker.rb index 3340a7be4fe..6c431b02979 100644 --- a/app/workers/delete_user_worker.rb +++ b/app/workers/delete_user_worker.rb @@ -1,6 +1,5 @@ class DeleteUserWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(current_user_id, delete_user_id, options = {}) delete_user = User.find(delete_user_id) diff --git a/app/workers/email_receiver_worker.rb b/app/workers/email_receiver_worker.rb index 1afa24c8e2a..dd8a6cbbef1 100644 --- a/app/workers/email_receiver_worker.rb +++ b/app/workers/email_receiver_worker.rb @@ -1,6 +1,5 @@ class EmailReceiverWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(raw) return unless Gitlab::IncomingEmail.enabled? @@ -39,8 +38,7 @@ class EmailReceiverWorker "You are not allowed to perform this action. If you believe this is in error, contact a staff member." when Gitlab::Email::NoteableNotFoundError "The thread you are replying to no longer exists, perhaps it was deleted? If you believe this is in error, contact a staff member." - when Gitlab::Email::InvalidNoteError, - Gitlab::Email::InvalidIssueError + when Gitlab::Email::InvalidRecordError can_retry = true e.message end diff --git a/app/workers/emails_on_push_worker.rb b/app/workers/emails_on_push_worker.rb index f5ccc84c160..21da27973fe 100644 --- a/app/workers/emails_on_push_worker.rb +++ b/app/workers/emails_on_push_worker.rb @@ -1,6 +1,5 @@ class EmailsOnPushWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker attr_reader :email, :skip_premailer diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb index a27585fd389..87e5dca01fd 100644 --- a/app/workers/expire_build_artifacts_worker.rb +++ b/app/workers/expire_build_artifacts_worker.rb @@ -1,5 +1,5 @@ class ExpireBuildArtifactsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform @@ -8,6 +8,6 @@ class ExpireBuildArtifactsWorker build_ids = Ci::Build.with_expired_artifacts.pluck(:id) build_ids = build_ids.map { |build_id| [build_id] } - Sidekiq::Client.push_bulk('class' => ExpireBuildInstanceArtifactsWorker, 'args' => build_ids ) + ExpireBuildInstanceArtifactsWorker.bulk_perform_async(build_ids) end end diff --git a/app/workers/expire_build_instance_artifacts_worker.rb b/app/workers/expire_build_instance_artifacts_worker.rb index 7b59e976492..234b4357cf7 100644 --- a/app/workers/expire_build_instance_artifacts_worker.rb +++ b/app/workers/expire_build_instance_artifacts_worker.rb @@ -1,6 +1,5 @@ class ExpireBuildInstanceArtifactsWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(build_id) build = Ci::Build diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index 98a7500bffe..a591e2da519 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -1,5 +1,5 @@ class ExpireJobCacheWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :cache diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 1a0e7f92875..a3ac32b437d 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -1,5 +1,5 @@ class ExpirePipelineCacheWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :cache diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb index ec65d3ff65e..8e26275669e 100644 --- a/app/workers/git_garbage_collect_worker.rb +++ b/app/workers/git_garbage_collect_worker.rb @@ -1,6 +1,5 @@ class GitGarbageCollectWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include Gitlab::CurrentSettings sidekiq_options retry: false diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb new file mode 100644 index 00000000000..400396d5755 --- /dev/null +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + # AdvanceStageWorker is a worker used by the GitHub importer to wait for a + # number of jobs to complete, without blocking a thread. Once all jobs have + # been completed this worker will advance the import process to the next + # stage. + class AdvanceStageWorker + include ApplicationWorker + + sidekiq_options queue: 'github_importer_advance_stage', dead: false + + INTERVAL = 30.seconds.to_i + + # The number of seconds to wait (while blocking the thread) before + # continueing to the next waiter. + BLOCKING_WAIT_TIME = 5 + + # The known importer stages and their corresponding Sidekiq workers. + STAGES = { + issues_and_diff_notes: Stage::ImportIssuesAndDiffNotesWorker, + notes: Stage::ImportNotesWorker, + finish: Stage::FinishImportWorker + }.freeze + + # project_id - The ID of the project being imported. + # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of + # remaining jobs. + # next_stage - The name of the next stage to start when all jobs have been + # completed. + def perform(project_id, waiters, next_stage) + return unless (project = find_project(project_id)) + + new_waiters = wait_for_jobs(waiters) + + if new_waiters.empty? + # We refresh the import JID here so workers importing individual + # resources (e.g. notes) don't have to do this all the time, reducing + # the pressure on Redis. We _only_ do this once all jobs are done so + # we don't get stuck forever if one or more jobs failed to notify the + # JobWaiter. + project.refresh_import_jid_expiration + + STAGES.fetch(next_stage.to_sym).perform_async(project_id) + else + self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) + end + end + + def wait_for_jobs(waiters) + waiters.each_with_object({}) do |(key, remaining), new_waiters| + waiter = JobWaiter.new(remaining, key) + + # We wait for a brief moment of time so we don't reschedule if we can + # complete the work fast enough. + waiter.wait(BLOCKING_WAIT_TIME) + + next unless waiter.jobs_remaining.positive? + + new_waiters[waiter.key] = waiter.jobs_remaining + end + end + + def find_project(id) + # We only care about the import JID so we can refresh it. We also only + # want the project if it hasn't been marked as failed yet. It's possible + # the import gets marked as stuck when jobs of the current stage failed + # somehow. + Project.select(:import_jid).import_started.find_by(id: id) + end + end + end +end diff --git a/app/workers/gitlab/github_import/import_diff_note_worker.rb b/app/workers/gitlab/github_import/import_diff_note_worker.rb new file mode 100644 index 00000000000..ef2a74c51c5 --- /dev/null +++ b/app/workers/gitlab/github_import/import_diff_note_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class ImportDiffNoteWorker + include ObjectImporter + + def representation_class + Representation::DiffNote + end + + def importer_class + Importer::DiffNoteImporter + end + + def counter_name + :github_importer_imported_diff_notes + end + + def counter_description + 'The number of imported GitHub pull request review comments' + end + end + end +end diff --git a/app/workers/gitlab/github_import/import_issue_worker.rb b/app/workers/gitlab/github_import/import_issue_worker.rb new file mode 100644 index 00000000000..1b081ae5966 --- /dev/null +++ b/app/workers/gitlab/github_import/import_issue_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class ImportIssueWorker + include ObjectImporter + + def representation_class + Representation::Issue + end + + def importer_class + Importer::IssueAndLabelLinksImporter + end + + def counter_name + :github_importer_imported_issues + end + + def counter_description + 'The number of imported GitHub issues' + end + end + end +end diff --git a/app/workers/gitlab/github_import/import_note_worker.rb b/app/workers/gitlab/github_import/import_note_worker.rb new file mode 100644 index 00000000000..d2b4c36a5b9 --- /dev/null +++ b/app/workers/gitlab/github_import/import_note_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class ImportNoteWorker + include ObjectImporter + + def representation_class + Representation::Note + end + + def importer_class + Importer::NoteImporter + end + + def counter_name + :github_importer_imported_notes + end + + def counter_description + 'The number of imported GitHub comments' + end + end + end +end diff --git a/app/workers/gitlab/github_import/import_pull_request_worker.rb b/app/workers/gitlab/github_import/import_pull_request_worker.rb new file mode 100644 index 00000000000..62a6da152a3 --- /dev/null +++ b/app/workers/gitlab/github_import/import_pull_request_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class ImportPullRequestWorker + include ObjectImporter + + def representation_class + Representation::PullRequest + end + + def importer_class + Importer::PullRequestImporter + end + + def counter_name + :github_importer_imported_pull_requests + end + + def counter_description + 'The number of imported GitHub pull requests' + end + end + end +end diff --git a/app/workers/gitlab/github_import/refresh_import_jid_worker.rb b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb new file mode 100644 index 00000000000..7108b531bc2 --- /dev/null +++ b/app/workers/gitlab/github_import/refresh_import_jid_worker.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + class RefreshImportJidWorker + include ApplicationWorker + include GithubImport::Queue + + # The interval to schedule new instances of this job at. + INTERVAL = 1.minute.to_i + + def self.perform_in_the_future(*args) + perform_in(INTERVAL, *args) + end + + # project_id - The ID of the project that is being imported. + # check_job_id - The ID of the job for which to check the status. + def perform(project_id, check_job_id) + return unless (project = find_project(project_id)) + + if SidekiqStatus.running?(check_job_id) + # As long as the repository is being cloned we want to keep refreshing + # the import JID status. + project.refresh_import_jid_expiration + self.class.perform_in_the_future(project_id, check_job_id) + end + + # If the job is no longer running there's nothing else we need to do. If + # the clone job completed successfully it will have scheduled the next + # stage, if it died there's nothing we can do anyway. + end + + def find_project(id) + Project.select(:import_jid).import_started.find_by(id: id) + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb new file mode 100644 index 00000000000..073d6608082 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class FinishImportWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # project - An instance of Project. + def import(_, project) + project.after_import + report_import_time(project) + end + + def report_import_time(project) + duration = Time.zone.now - project.created_at + path = project.path_with_namespace + + histogram.observe({ project: path }, duration) + counter.increment + + logger.info("GitHub importer finished for #{path} in #{duration.round(2)} seconds") + end + + def histogram + @histogram ||= Gitlab::Metrics.histogram( + :github_importer_total_duration_seconds, + 'Total time spent importing GitHub projects, in seconds' + ) + end + + def counter + @counter ||= Gitlab::Metrics.counter( + :github_importer_imported_projects, + 'The number of imported GitHub projects' + ) + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_base_data_worker.rb b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb new file mode 100644 index 00000000000..5726fbb573d --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_base_data_worker.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportBaseDataWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # These importers are fast enough that we can just run them in the same + # thread. + IMPORTERS = [ + Importer::LabelsImporter, + Importer::MilestonesImporter, + Importer::ReleasesImporter + ].freeze + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + IMPORTERS.each do |klass| + klass.new(project, client).execute + end + + project.refresh_import_jid_expiration + + ImportPullRequestsWorker.perform_async(project.id) + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb new file mode 100644 index 00000000000..7007754ff2e --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_issues_and_diff_notes_worker.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportIssuesAndDiffNotesWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # The importers to run in this stage. Issues can't be imported earlier + # on as we also use these to enrich pull requests with assigned labels. + IMPORTERS = [ + Importer::IssuesImporter, + Importer::DiffNotesImporter + ].freeze + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiters = IMPORTERS.each_with_object({}) do |klass, hash| + waiter = klass.new(project, client).execute + hash[waiter.key] = waiter.jobs_remaining + end + + AdvanceStageWorker.perform_async(project.id, waiters, :notes) + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_notes_worker.rb b/app/workers/gitlab/github_import/stage/import_notes_worker.rb new file mode 100644 index 00000000000..5f4678a595f --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_notes_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportNotesWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiter = Importer::NotesImporter + .new(project, client) + .execute + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :finish + ) + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb new file mode 100644 index 00000000000..1c5a7139802 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportPullRequestsWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + waiter = Importer::PullRequestsImporter + .new(project, client) + .execute + + project.refresh_import_jid_expiration + + AdvanceStageWorker.perform_async( + project.id, + { waiter.key => waiter.jobs_remaining }, + :issues_and_diff_notes + ) + end + end + end + end +end diff --git a/app/workers/gitlab/github_import/stage/import_repository_worker.rb b/app/workers/gitlab/github_import/stage/import_repository_worker.rb new file mode 100644 index 00000000000..4d16cef1130 --- /dev/null +++ b/app/workers/gitlab/github_import/stage/import_repository_worker.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module Stage + class ImportRepositoryWorker + include ApplicationWorker + include GithubImport::Queue + include StageMethods + + # client - An instance of Gitlab::GithubImport::Client. + # project - An instance of Project. + def import(client, project) + # In extreme cases it's possible for a clone to take more than the + # import job expiration time. To work around this we schedule a + # separate job that will periodically run and refresh the import + # expiration time. + RefreshImportJidWorker.perform_in_the_future(project.id, jid) + + importer = Importer::RepositoryImporter.new(project, client) + + return unless importer.execute + + counter.increment + + ImportBaseDataWorker.perform_async(project.id) + end + + def counter + Gitlab::Metrics.counter( + :github_importer_imported_repositories, + 'The number of imported GitHub repositories' + ) + end + end + end + end +end diff --git a/app/workers/gitlab_shell_worker.rb b/app/workers/gitlab_shell_worker.rb index 0ec871e00e1..a0028e41332 100644 --- a/app/workers/gitlab_shell_worker.rb +++ b/app/workers/gitlab_shell_worker.rb @@ -1,7 +1,6 @@ class GitlabShellWorker - include Sidekiq::Worker + include ApplicationWorker include Gitlab::ShellAdapter - include DedicatedSidekiqQueue def perform(action, *arg) gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb index 0a55aab63fd..6dd281b1147 100644 --- a/app/workers/gitlab_usage_ping_worker.rb +++ b/app/workers/gitlab_usage_ping_worker.rb @@ -1,7 +1,7 @@ class GitlabUsagePingWorker LEASE_TIMEOUT = 86400 - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/group_destroy_worker.rb b/app/workers/group_destroy_worker.rb index bd8e212e928..f577b310b20 100644 --- a/app/workers/group_destroy_worker.rb +++ b/app/workers/group_destroy_worker.rb @@ -1,6 +1,5 @@ class GroupDestroyWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include ExceptionBacktrace def perform(group_id, user_id) diff --git a/app/workers/import_export_project_cleanup_worker.rb b/app/workers/import_export_project_cleanup_worker.rb index 7957ed807ab..9788c8df3a3 100644 --- a/app/workers/import_export_project_cleanup_worker.rb +++ b/app/workers/import_export_project_cleanup_worker.rb @@ -1,5 +1,5 @@ class ImportExportProjectCleanupWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/invalid_gpg_signature_update_worker.rb b/app/workers/invalid_gpg_signature_update_worker.rb index db6b1ea8e8d..6774ab307c6 100644 --- a/app/workers/invalid_gpg_signature_update_worker.rb +++ b/app/workers/invalid_gpg_signature_update_worker.rb @@ -1,6 +1,5 @@ class InvalidGpgSignatureUpdateWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(gpg_key_id) gpg_key = GpgKey.find_by(id: gpg_key_id) diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb index 3dd14466994..9ae5456be4c 100644 --- a/app/workers/irker_worker.rb +++ b/app/workers/irker_worker.rb @@ -2,8 +2,7 @@ require 'json' require 'socket' class IrkerWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(project_id, chans, colors, push_data, settings) project = Project.find(project_id) @@ -104,6 +103,7 @@ class IrkerWorker parents = commit.parents # Return old value if there's no new one return push_data['before'] if parents.empty? + # Or return the first parent-commit parents[0].id end diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 48e2da338f6..ba832fe30c6 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -1,6 +1,5 @@ class MergeWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(merge_request_id, current_user_id, params) params = params.with_indifferent_access diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb index f1cd1769421..adb25c2a170 100644 --- a/app/workers/namespaceless_project_destroy_worker.rb +++ b/app/workers/namespaceless_project_destroy_worker.rb @@ -5,14 +5,9 @@ # The worker will reject doing anything for projects that *do* have a # namespace. For those use ProjectDestroyWorker instead. class NamespacelessProjectDestroyWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include ExceptionBacktrace - def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) - end - def perform(project_id) begin project = Project.unscoped.find(project_id) diff --git a/app/workers/new_issue_worker.rb b/app/workers/new_issue_worker.rb index d9a8e892e90..3bc030f9c62 100644 --- a/app/workers/new_issue_worker.rb +++ b/app/workers/new_issue_worker.rb @@ -1,6 +1,5 @@ class NewIssueWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include NewIssuable def perform(issue_id, user_id) diff --git a/app/workers/new_merge_request_worker.rb b/app/workers/new_merge_request_worker.rb index 1910c490159..bda2a0ab59d 100644 --- a/app/workers/new_merge_request_worker.rb +++ b/app/workers/new_merge_request_worker.rb @@ -1,6 +1,5 @@ class NewMergeRequestWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include NewIssuable def perform(merge_request_id, user_id) diff --git a/app/workers/new_note_worker.rb b/app/workers/new_note_worker.rb index 926162b8c53..67c54fbf10e 100644 --- a/app/workers/new_note_worker.rb +++ b/app/workers/new_note_worker.rb @@ -1,6 +1,5 @@ class NewNoteWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker # Keep extra parameter to preserve backwards compatibility with # old `NewNoteWorker` jobs (can remove later) diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 64788da7299..62f733c02fc 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -1,5 +1,5 @@ class PagesWorker - include Sidekiq::Worker + include ApplicationWorker sidekiq_options queue: :pages, retry: false diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 30a75ec8435..661c29efe88 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -1,5 +1,5 @@ class PipelineHooksWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :hooks diff --git a/app/workers/pipeline_metrics_worker.rb b/app/workers/pipeline_metrics_worker.rb index 070943f1ecc..d46d1f122fc 100644 --- a/app/workers/pipeline_metrics_worker.rb +++ b/app/workers/pipeline_metrics_worker.rb @@ -1,5 +1,5 @@ class PipelineMetricsWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue def perform(pipeline_id) diff --git a/app/workers/pipeline_notification_worker.rb b/app/workers/pipeline_notification_worker.rb index cdb860b6675..a9a1168a6e3 100644 --- a/app/workers/pipeline_notification_worker.rb +++ b/app/workers/pipeline_notification_worker.rb @@ -1,5 +1,5 @@ class PipelineNotificationWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue def perform(pipeline_id, recipients = nil) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 8c067d05081..07dbf6a971e 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -1,5 +1,5 @@ class PipelineProcessWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb index d7087f20dfc..c49758878a4 100644 --- a/app/workers/pipeline_schedule_worker.rb +++ b/app/workers/pipeline_schedule_worker.rb @@ -1,5 +1,5 @@ class PipelineScheduleWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform @@ -9,7 +9,7 @@ class PipelineScheduleWorker pipeline = Ci::CreatePipelineService.new(schedule.project, schedule.owner, ref: schedule.ref) - .execute(:schedule, save_on_errors: false, schedule: schedule) + .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule) schedule.deactivate! unless pipeline.persisted? rescue => e diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index cb8bb2ffe75..68c40a259e1 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -1,5 +1,5 @@ class PipelineSuccessWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 5fa399dff4c..24a8a9fbed5 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -1,5 +1,5 @@ class PipelineUpdateWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb index b8f8d3750d9..f2b2c4428d3 100644 --- a/app/workers/post_receive.rb +++ b/app/workers/post_receive.rb @@ -1,6 +1,5 @@ class PostReceive - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(gl_repository, identifier, changes) project, is_wiki = Gitlab::GlRepository.parse(gl_repository) diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index c0c03848a40..52eebe475ec 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -5,8 +5,7 @@ # Consider using an extra worker if you need to add any extra (and potentially # slow) processing of commits. class ProcessCommitWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker # project_id - The ID of the project this commit belongs to. # user_id - The ID of the user that pushed the commit. diff --git a/app/workers/project_cache_worker.rb b/app/workers/project_cache_worker.rb index 505ff9e086e..f19bcbf946a 100644 --- a/app/workers/project_cache_worker.rb +++ b/app/workers/project_cache_worker.rb @@ -1,7 +1,6 @@ # Worker for updating any project specific caches. class ProjectCacheWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker LEASE_TIMEOUT = 15.minutes.to_i diff --git a/app/workers/project_destroy_worker.rb b/app/workers/project_destroy_worker.rb index 3be7e686609..1ba854ca4cb 100644 --- a/app/workers/project_destroy_worker.rb +++ b/app/workers/project_destroy_worker.rb @@ -1,6 +1,5 @@ class ProjectDestroyWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include ExceptionBacktrace def perform(project_id, user_id, params) diff --git a/app/workers/project_export_worker.rb b/app/workers/project_export_worker.rb index f13ac9e5db2..c100852374a 100644 --- a/app/workers/project_export_worker.rb +++ b/app/workers/project_export_worker.rb @@ -1,6 +1,5 @@ class ProjectExportWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include ExceptionBacktrace sidekiq_options retry: 3 diff --git a/app/workers/project_migrate_hashed_storage_worker.rb b/app/workers/project_migrate_hashed_storage_worker.rb index ca276d7801c..d01eb744e5d 100644 --- a/app/workers/project_migrate_hashed_storage_worker.rb +++ b/app/workers/project_migrate_hashed_storage_worker.rb @@ -1,11 +1,34 @@ class ProjectMigrateHashedStorageWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker + + LEASE_TIMEOUT = 30.seconds.to_i def perform(project_id) project = Project.find_by(id: project_id) return if project.nil? || project.pending_delete? - ::Projects::HashedStorageMigrationService.new(project, logger).execute + uuid = lease_for(project_id).try_obtain + if uuid + ::Projects::HashedStorageMigrationService.new(project, logger).execute + else + false + end + rescue => ex + cancel_lease_for(project_id, uuid) if uuid + raise ex + end + + def lease_for(project_id) + Gitlab::ExclusiveLease.new(lease_key(project_id), timeout: LEASE_TIMEOUT) + end + + private + + def lease_key(project_id) + "project_migrate_hashed_storage_worker:#{project_id}" + end + + def cancel_lease_for(project_id, uuid) + Gitlab::ExclusiveLease.cancel(lease_key(project_id), uuid) end end diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb index 4883d848c53..75c4b8b3663 100644 --- a/app/workers/project_service_worker.rb +++ b/app/workers/project_service_worker.rb @@ -1,6 +1,5 @@ class ProjectServiceWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker sidekiq_options dead: false diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb index 6b607451c7a..635a97c99af 100644 --- a/app/workers/propagate_service_template_worker.rb +++ b/app/workers/propagate_service_template_worker.rb @@ -1,7 +1,6 @@ # Worker for updating any project specific caches. class PropagateServiceTemplateWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker LEASE_TIMEOUT = 4.hours.to_i diff --git a/app/workers/prune_old_events_worker.rb b/app/workers/prune_old_events_worker.rb index 2b43bb19ad1..5ff62ab1369 100644 --- a/app/workers/prune_old_events_worker.rb +++ b/app/workers/prune_old_events_worker.rb @@ -1,5 +1,5 @@ class PruneOldEventsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/reactive_caching_worker.rb b/app/workers/reactive_caching_worker.rb index 18b8daf4e1e..ef3ddb9024b 100644 --- a/app/workers/reactive_caching_worker.rb +++ b/app/workers/reactive_caching_worker.rb @@ -1,6 +1,5 @@ class ReactiveCachingWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(class_name, id, *args) klass = begin diff --git a/app/workers/remove_expired_group_links_worker.rb b/app/workers/remove_expired_group_links_worker.rb index 2a619f83410..7e64c3070a8 100644 --- a/app/workers/remove_expired_group_links_worker.rb +++ b/app/workers/remove_expired_group_links_worker.rb @@ -1,5 +1,5 @@ class RemoveExpiredGroupLinksWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/remove_expired_members_worker.rb b/app/workers/remove_expired_members_worker.rb index 31f652e5f9b..d80b3b15840 100644 --- a/app/workers/remove_expired_members_worker.rb +++ b/app/workers/remove_expired_members_worker.rb @@ -1,5 +1,5 @@ class RemoveExpiredMembersWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/remove_old_web_hook_logs_worker.rb b/app/workers/remove_old_web_hook_logs_worker.rb index 555e1bb8691..87fed42d7ce 100644 --- a/app/workers/remove_old_web_hook_logs_worker.rb +++ b/app/workers/remove_old_web_hook_logs_worker.rb @@ -1,5 +1,5 @@ class RemoveOldWebHookLogsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue WEB_HOOK_LOG_LIFETIME = 2.days diff --git a/app/workers/remove_unreferenced_lfs_objects_worker.rb b/app/workers/remove_unreferenced_lfs_objects_worker.rb index b80f131d5f7..8daf079fc31 100644 --- a/app/workers/remove_unreferenced_lfs_objects_worker.rb +++ b/app/workers/remove_unreferenced_lfs_objects_worker.rb @@ -1,5 +1,5 @@ class RemoveUnreferencedLfsObjectsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/repository_archive_cache_worker.rb b/app/workers/repository_archive_cache_worker.rb index e47069df189..86a258cf94f 100644 --- a/app/workers/repository_archive_cache_worker.rb +++ b/app/workers/repository_archive_cache_worker.rb @@ -1,5 +1,5 @@ class RepositoryArchiveCacheWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/repository_check/batch_worker.rb b/app/workers/repository_check/batch_worker.rb index b94d83bd709..76688cf51c1 100644 --- a/app/workers/repository_check/batch_worker.rb +++ b/app/workers/repository_check/batch_worker.rb @@ -1,6 +1,6 @@ module RepositoryCheck class BatchWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue RUN_TIME = 3600 diff --git a/app/workers/repository_check/clear_worker.rb b/app/workers/repository_check/clear_worker.rb index 85bc9103538..97b89dc3db5 100644 --- a/app/workers/repository_check/clear_worker.rb +++ b/app/workers/repository_check/clear_worker.rb @@ -1,6 +1,6 @@ module RepositoryCheck class ClearWorker - include Sidekiq::Worker + include ApplicationWorker include RepositoryCheckQueue def perform diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb index 164586cf0b7..4e3c691e8da 100644 --- a/app/workers/repository_check/single_repository_worker.rb +++ b/app/workers/repository_check/single_repository_worker.rb @@ -1,6 +1,6 @@ module RepositoryCheck class SingleRepositoryWorker - include Sidekiq::Worker + include ApplicationWorker include RepositoryCheckQueue def perform(project_id) @@ -32,16 +32,14 @@ module RepositoryCheck end def git_fsck(repository) - path = repository.path_to_repo - cmd = %W(nice git --git-dir=#{path} fsck) - output, status = Gitlab::Popen.popen(cmd) + return false unless repository.exists? - if status.zero? - true - else - Gitlab::RepositoryCheckLogger.error("command failed: #{cmd.join(' ')}\n#{output}") - false - end + repository.raw_repository.fsck + + true + rescue Gitlab::Git::Repository::GitError => e + Gitlab::RepositoryCheckLogger.error(e.message) + false end def has_pushes?(project) diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index 264706e3e23..a07ef1705a1 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -1,25 +1,24 @@ class RepositoryForkWorker ForkError = Class.new(StandardError) - include Sidekiq::Worker + include ApplicationWorker include Gitlab::ShellAdapter - include DedicatedSidekiqQueue include ProjectStartImport sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION - def perform(project_id, forked_from_repository_storage_path, source_path, target_path) + def perform(project_id, forked_from_repository_storage_path, source_disk_path) project = Project.find(project_id) return unless start_fork(project) Gitlab::Metrics.add_event(:fork_repository, - source_path: source_path, - target_path: target_path) + source_path: source_disk_path, + target_path: project.disk_path) - result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path, - project.repository_storage_path, target_path) - raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result + result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path, + project.repository_storage_path, project.disk_path) + raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result project.repository.after_import raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index d7c0043d3b6..55715c83cb1 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,8 +1,7 @@ class RepositoryImportWorker ImportError = Class.new(StandardError) - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker include ExceptionBacktrace include ProjectStartImport @@ -17,11 +16,16 @@ class RepositoryImportWorker import_url: project.import_url, path: project.full_path) - result = Projects::ImportService.new(project, project.creator).execute + service = Projects::ImportService.new(project, project.creator) + result = service.execute + + # Some importers may perform their work asynchronously. In this case it's up + # to those importers to mark the import process as complete. + return if service.async? + raise ImportError, result[:message] if result[:status] == :error - project.repository.after_import - project.import_finish + project.after_import rescue ImportError => ex fail_import(project, ex.message) raise diff --git a/app/workers/requests_profiles_worker.rb b/app/workers/requests_profiles_worker.rb index 703b025d76e..55c236e9e9d 100644 --- a/app/workers/requests_profiles_worker.rb +++ b/app/workers/requests_profiles_worker.rb @@ -1,5 +1,5 @@ class RequestsProfilesWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/schedule_update_user_activity_worker.rb b/app/workers/schedule_update_user_activity_worker.rb index 6c2c3e437f3..d9376577597 100644 --- a/app/workers/schedule_update_user_activity_worker.rb +++ b/app/workers/schedule_update_user_activity_worker.rb @@ -1,5 +1,5 @@ class ScheduleUpdateUserActivityWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform(batch_size = 500) diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index c301cea5ad6..69f2318d83b 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -1,5 +1,5 @@ class StageUpdateWorker - include Sidekiq::Worker + include ApplicationWorker include PipelineQueue enqueue_in group: :processing diff --git a/app/workers/storage_migrator_worker.rb b/app/workers/storage_migrator_worker.rb index b48ead799b9..f92421a667d 100644 --- a/app/workers/storage_migrator_worker.rb +++ b/app/workers/storage_migrator_worker.rb @@ -1,6 +1,5 @@ class StorageMigratorWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker BATCH_SIZE = 100 diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb index 269776a1f62..fb26fa4c515 100644 --- a/app/workers/stuck_ci_jobs_worker.rb +++ b/app/workers/stuck_ci_jobs_worker.rb @@ -1,5 +1,5 @@ class StuckCiJobsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue EXCLUSIVE_LEASE_KEY = 'stuck_ci_builds_worker_lease'.freeze @@ -39,14 +39,23 @@ class StuckCiJobsWorker def drop_stuck(status, timeout) search(status, timeout) do |build| return unless build.stuck? + drop_build :stuck, build, status, timeout end end def search(status, timeout) - builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago) - builds.joins(:project).merge(Project.without_deleted).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| - yield(build) + loop do + jobs = Ci::Build.where(status: status) + .where('ci_builds.updated_at < ?', timeout.ago) + .includes(:tags, :runner, project: :namespace) + .limit(100) + .to_a + break if jobs.empty? + + jobs.each do |job| + yield(job) + end end end diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb index f850e459cd9..e0e6d1418de 100644 --- a/app/workers/stuck_import_jobs_worker.rb +++ b/app/workers/stuck_import_jobs_worker.rb @@ -1,5 +1,5 @@ class StuckImportJobsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue IMPORT_JOBS_EXPIRATION = 15.hours.to_i diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb index a396c0f27b2..36d2a2e6466 100644 --- a/app/workers/stuck_merge_jobs_worker.rb +++ b/app/workers/stuck_merge_jobs_worker.rb @@ -1,5 +1,5 @@ class StuckMergeJobsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/system_hook_push_worker.rb b/app/workers/system_hook_push_worker.rb index e43bbe35de9..ceeaaf8d189 100644 --- a/app/workers/system_hook_push_worker.rb +++ b/app/workers/system_hook_push_worker.rb @@ -1,6 +1,5 @@ class SystemHookPushWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(push_data, hook_id) SystemHooksService.new.execute_hooks(push_data, hook_id) diff --git a/app/workers/trending_projects_worker.rb b/app/workers/trending_projects_worker.rb index 0531630d13a..7eb65452a7d 100644 --- a/app/workers/trending_projects_worker.rb +++ b/app/workers/trending_projects_worker.rb @@ -1,5 +1,5 @@ class TrendingProjectsWorker - include Sidekiq::Worker + include ApplicationWorker include CronjobQueue def perform diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb new file mode 100644 index 00000000000..0a2e9b63578 --- /dev/null +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -0,0 +1,15 @@ +class UpdateHeadPipelineForMergeRequestWorker + include ApplicationWorker + + sidekiq_options queue: 'pipeline_default' + + def perform(merge_request_id) + merge_request = MergeRequest.find(merge_request_id) + pipeline = Ci::Pipeline.where(project: merge_request.source_project, ref: merge_request.source_branch).last + + return unless pipeline && pipeline.latest? + raise ArgumentError, 'merge request sha does not equal pipeline sha' if merge_request.diff_head_sha != pipeline.sha + + merge_request.update_attribute(:head_pipeline_id, pipeline.id) + end +end diff --git a/app/workers/update_merge_requests_worker.rb b/app/workers/update_merge_requests_worker.rb index 150788ca611..74bb9993275 100644 --- a/app/workers/update_merge_requests_worker.rb +++ b/app/workers/update_merge_requests_worker.rb @@ -1,10 +1,7 @@ class UpdateMergeRequestsWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker - def metrics_tags - @metrics_tags || {} - end + LOG_TIME_THRESHOLD = 90 # seconds def perform(project_id, user_id, oldrev, newrev, ref) project = Project.find_by(id: project_id) @@ -13,11 +10,20 @@ class UpdateMergeRequestsWorker user = User.find_by(id: user_id) return unless user - @metrics_tags = { - project_id: project_id, - user_id: user_id - } + # TODO: remove this benchmarking when we have rich logging + time = Benchmark.measure do + MergeRequests::RefreshService.new(project, user).execute(oldrev, newrev, ref) + end + + args_log = [ + "elapsed=#{time.real}", + "project_id=#{project_id}", + "user_id=#{user_id}", + "oldrev=#{oldrev}", + "newrev=#{newrev}", + "ref=#{ref}" + ].join(',') - MergeRequests::RefreshService.new(project, user).execute(oldrev, newrev, ref) + Rails.logger.info("UpdateMergeRequestsWorker#perform #{args_log}") if time.real > LOG_TIME_THRESHOLD end end diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb index 31bbdb69edb..27ec5cd33fb 100644 --- a/app/workers/update_user_activity_worker.rb +++ b/app/workers/update_user_activity_worker.rb @@ -1,6 +1,5 @@ class UpdateUserActivityWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(pairs) pairs = cast_data(pairs) diff --git a/app/workers/upload_checksum_worker.rb b/app/workers/upload_checksum_worker.rb index 78931f1258f..9222760c031 100644 --- a/app/workers/upload_checksum_worker.rb +++ b/app/workers/upload_checksum_worker.rb @@ -1,6 +1,5 @@ class UploadChecksumWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker def perform(upload_id) upload = Upload.find(upload_id) diff --git a/app/workers/wait_for_cluster_creation_worker.rb b/app/workers/wait_for_cluster_creation_worker.rb index 241ed3901dc..19cdb279aaa 100644 --- a/app/workers/wait_for_cluster_creation_worker.rb +++ b/app/workers/wait_for_cluster_creation_worker.rb @@ -1,5 +1,5 @@ class WaitForClusterCreationWorker - include Sidekiq::Worker + include ApplicationWorker include ClusterQueue def perform(cluster_id) diff --git a/app/workers/web_hook_worker.rb b/app/workers/web_hook_worker.rb index 713c0228040..dfc3f33ad9d 100644 --- a/app/workers/web_hook_worker.rb +++ b/app/workers/web_hook_worker.rb @@ -1,6 +1,5 @@ class WebHookWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue + include ApplicationWorker sidekiq_options retry: 4, dead: false |
