From 509910b89f636f95d2d5a9cd3f38ce8f7f4f47a6 Mon Sep 17 00:00:00 2001 From: Yorick Peterse Date: Fri, 7 Oct 2016 15:20:57 +0200 Subject: Process commits in a separate worker This moves the code used for processing commits from GitPushService to its own Sidekiq worker: ProcessCommitWorker. Using a Sidekiq worker allows us to process multiple commits in parallel. This in turn will lead to issues being closed faster and cross references being created faster. Furthermore by isolating this code into a separate class it's easier to test and maintain the code. The new worker also ensures it can efficiently check which issues can be closed, without having to run numerous SQL queries for every issue. --- app/models/issue_collection.rb | 2 + app/policies/issue_policy.rb | 2 +- app/services/git_push_service.rb | 37 +------ app/services/issues/close_service.rb | 13 +++ app/workers/process_commit_worker.rb | 67 +++++++++++++ .../unreleased/process-commits-using-sidekiq.yml | 4 + config/sidekiq_queues.yml | 1 + spec/models/issue_collection_spec.rb | 9 ++ spec/services/git_push_service_spec.rb | 9 ++ spec/services/issues/close_service_spec.rb | 49 ++++++--- spec/workers/process_commit_worker_spec.rb | 109 +++++++++++++++++++++ 11 files changed, 251 insertions(+), 51 deletions(-) create mode 100644 app/workers/process_commit_worker.rb create mode 100644 changelogs/unreleased/process-commits-using-sidekiq.yml create mode 100644 spec/workers/process_commit_worker_spec.rb diff --git a/app/models/issue_collection.rb b/app/models/issue_collection.rb index df36252a5b1..f0b7d9914c8 100644 --- a/app/models/issue_collection.rb +++ b/app/models/issue_collection.rb @@ -32,6 +32,8 @@ class IssueCollection end end + alias_method :visible_to, :updatable_by_user + private def project_ids diff --git a/app/policies/issue_policy.rb b/app/policies/issue_policy.rb index f3ede58a001..52fa33bc4b0 100644 --- a/app/policies/issue_policy.rb +++ b/app/policies/issue_policy.rb @@ -18,6 +18,6 @@ class IssuePolicy < IssuablePolicy def can_read_confidential? return false unless @user - IssueCollection.new([@subject]).updatable_by_user(@user).any? + IssueCollection.new([@subject]).visible_to(@user).any? end end diff --git a/app/services/git_push_service.rb b/app/services/git_push_service.rb index e8415862de5..de313095bed 100644 --- a/app/services/git_push_service.rb +++ b/app/services/git_push_service.rb @@ -105,35 +105,11 @@ class GitPushService < BaseService # Extract any GFM references from the pushed commit messages. If the configured issue-closing regex is matched, # close the referenced Issue. Create cross-reference Notes corresponding to any other referenced Mentionables. def process_commit_messages - is_default_branch = is_default_branch? - - authors = Hash.new do |hash, commit| - email = commit.author_email - next hash[email] if hash.has_key?(email) - - hash[email] = commit_user(commit) - end + default = is_default_branch? @push_commits.each do |commit| - # Keep track of the issues that will be actually closed because they are on a default branch. - # Hence, when creating cross-reference notes, the not-closed issues (on non-default branches) - # will also have cross-reference. - closed_issues = [] - - if is_default_branch - # Close issues if these commits were pushed to the project's default branch and the commit message matches the - # closing regex. Exclude any mentioned Issues from cross-referencing even if the commits are being pushed to - # a different branch. - closed_issues = commit.closes_issues(current_user) - closed_issues.each do |issue| - if can?(current_user, :update_issue, issue) - Issues::CloseService.new(project, authors[commit], {}).execute(issue, commit: commit) - end - end - end - - commit.create_cross_references!(authors[commit], closed_issues) - update_issue_metrics(commit, authors) + ProcessCommitWorker. + perform_async(project.id, current_user.id, commit.id, default) end end @@ -176,11 +152,4 @@ class GitPushService < BaseService def branch_name @branch_name ||= Gitlab::Git.ref_name(params[:ref]) end - - def update_issue_metrics(commit, authors) - mentioned_issues = commit.all_references(authors[commit]).issues - - Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil). - update_all(first_mentioned_in_commit_at: commit.committed_date) - end end diff --git a/app/services/issues/close_service.rb b/app/services/issues/close_service.rb index 45cca216ccc..ab4c51386a4 100644 --- a/app/services/issues/close_service.rb +++ b/app/services/issues/close_service.rb @@ -1,8 +1,21 @@ module Issues class CloseService < Issues::BaseService + # Closes the supplied issue if the current user is able to do so. def execute(issue, commit: nil, notifications: true, system_note: true) return issue unless can?(current_user, :update_issue, issue) + close_issue(issue, + commit: commit, + notifications: notifications, + system_note: system_note) + end + + # Closes the supplied issue without checking if the user is authorized to + # do so. + # + # The code calling this method is responsible for ensuring that a user is + # allowed to close the given issue. + def close_issue(issue, commit: nil, notifications: true, system_note: true) if project.jira_tracker? && project.jira_service.active project.jira_service.execute(commit, issue) todo_service.close_issue(issue, current_user) diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb new file mode 100644 index 00000000000..071741fbacd --- /dev/null +++ b/app/workers/process_commit_worker.rb @@ -0,0 +1,67 @@ +# Worker for processing individiual commit messages pushed to a repository. +# +# Jobs for this worker are scheduled for every commit that is being pushed. As a +# result of this the workload of this worker should be kept to a bare minimum. +# Consider using an extra worker if you need to add any extra (and potentially +# slow) processing of commits. +class ProcessCommitWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + # project_id - The ID of the project this commit belongs to. + # user_id - The ID of the user that pushed the commit. + # commit_sha - The SHA1 of the commit to process. + # default - The data was pushed to the default branch. + def perform(project_id, user_id, commit_sha, default = false) + project = Project.find_by(id: project_id) + + return unless project + + user = User.find_by(id: user_id) + + return unless user + + commit = find_commit(project, commit_sha) + + return unless commit + + author = commit.author || user + + process_commit_message(project, commit, user, author, default) + + update_issue_metrics(commit, author) + end + + def process_commit_message(project, commit, user, author, default = false) + closed_issues = default ? commit.closes_issues(user) : [] + + unless closed_issues.empty? + close_issues(project, user, author, commit, closed_issues) + end + + commit.create_cross_references!(author, closed_issues) + end + + def close_issues(project, user, author, commit, issues) + # We don't want to run permission related queries for every single issue, + # therefor we use IssueCollection here and skip the authorization check in + # Issues::CloseService#execute. + IssueCollection.new(issues).updatable_by_user(user).each do |issue| + Issues::CloseService.new(project, author). + close_issue(issue, commit: commit) + end + end + + def update_issue_metrics(commit, author) + mentioned_issues = commit.all_references(author).issues + + Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil). + update_all(first_mentioned_in_commit_at: commit.committed_date) + end + + private + + def find_commit(project, sha) + project.commit(sha) + end +end diff --git a/changelogs/unreleased/process-commits-using-sidekiq.yml b/changelogs/unreleased/process-commits-using-sidekiq.yml new file mode 100644 index 00000000000..9f596e6a584 --- /dev/null +++ b/changelogs/unreleased/process-commits-using-sidekiq.yml @@ -0,0 +1,4 @@ +--- +title: Process commits using a dedicated Sidekiq worker +merge_request: 6802 +author: diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index f36fe893fd0..0aec8aedf72 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -21,6 +21,7 @@ - [post_receive, 5] - [merge, 5] - [update_merge_requests, 3] + - [process_commit, 2] - [new_note, 2] - [build, 2] - [pipeline, 2] diff --git a/spec/models/issue_collection_spec.rb b/spec/models/issue_collection_spec.rb index d9ab397c302..d742c814680 100644 --- a/spec/models/issue_collection_spec.rb +++ b/spec/models/issue_collection_spec.rb @@ -55,4 +55,13 @@ describe IssueCollection do end end end + + describe '#visible_to' do + it 'is an alias for updatable_by_user' do + updatable_by_user = described_class.instance_method(:updatable_by_user) + visible_to = described_class.instance_method(:visible_to) + + expect(visible_to).to eq(updatable_by_user) + end + end end diff --git a/spec/services/git_push_service_spec.rb b/spec/services/git_push_service_spec.rb index 1c7a3d04b09..cea7e6429f9 100644 --- a/spec/services/git_push_service_spec.rb +++ b/spec/services/git_push_service_spec.rb @@ -302,6 +302,9 @@ describe GitPushService, services: true do author_email: commit_author.email ) + allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + and_return(commit) + allow(project.repository).to receive(:commits_between).and_return([commit]) end @@ -357,6 +360,9 @@ describe GitPushService, services: true do committed_date: commit_time ) + allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + and_return(commit) + allow(project.repository).to receive(:commits_between).and_return([commit]) end @@ -393,6 +399,9 @@ describe GitPushService, services: true do allow(project.repository).to receive(:commits_between). and_return([closing_commit]) + allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit). + and_return(closing_commit) + project.team << [commit_author, :master] end diff --git a/spec/services/issues/close_service_spec.rb b/spec/services/issues/close_service_spec.rb index 5dfb33f4b28..4465f22a001 100644 --- a/spec/services/issues/close_service_spec.rb +++ b/spec/services/issues/close_service_spec.rb @@ -15,10 +15,39 @@ describe Issues::CloseService, services: true do end describe '#execute' do + let(:service) { described_class.new(project, user) } + + it 'checks if the user is authorized to update the issue' do + expect(service).to receive(:can?).with(user, :update_issue, issue). + and_call_original + + service.execute(issue) + end + + it 'does not close the issue when the user is not authorized to do so' do + allow(service).to receive(:can?).with(user, :update_issue, issue). + and_return(false) + + expect(service).not_to receive(:close_issue) + expect(service.execute(issue)).to eq(issue) + end + + it 'closes the issue when the user is authorized to do so' do + allow(service).to receive(:can?).with(user, :update_issue, issue). + and_return(true) + + expect(service).to receive(:close_issue). + with(issue, commit: nil, notifications: true, system_note: true) + + service.execute(issue) + end + end + + describe '#close_issue' do context "valid params" do before do perform_enqueued_jobs do - described_class.new(project, user).execute(issue) + described_class.new(project, user).close_issue(issue) end end @@ -41,24 +70,12 @@ describe Issues::CloseService, services: true do end end - context 'current user is not authorized to close issue' do - before do - perform_enqueued_jobs do - described_class.new(project, guest).execute(issue) - end - end - - it 'does not close the issue' do - expect(issue).to be_open - end - end - context 'when issue is not confidential' do it 'executes issue hooks' do expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :issue_hooks) expect(project).to receive(:execute_services).with(an_instance_of(Hash), :issue_hooks) - described_class.new(project, user).execute(issue) + described_class.new(project, user).close_issue(issue) end end @@ -69,14 +86,14 @@ describe Issues::CloseService, services: true do expect(project).to receive(:execute_hooks).with(an_instance_of(Hash), :confidential_issue_hooks) expect(project).to receive(:execute_services).with(an_instance_of(Hash), :confidential_issue_hooks) - described_class.new(project, user).execute(issue) + described_class.new(project, user).close_issue(issue) end end context 'external issue tracker' do before do allow(project).to receive(:default_issues_tracker?).and_return(false) - described_class.new(project, user).execute(issue) + described_class.new(project, user).close_issue(issue) end it { expect(issue).to be_valid } diff --git a/spec/workers/process_commit_worker_spec.rb b/spec/workers/process_commit_worker_spec.rb new file mode 100644 index 00000000000..3e4fee42240 --- /dev/null +++ b/spec/workers/process_commit_worker_spec.rb @@ -0,0 +1,109 @@ +require 'spec_helper' + +describe ProcessCommitWorker do + let(:worker) { described_class.new } + let(:user) { create(:user) } + let(:project) { create(:project, :public) } + let(:issue) { create(:issue, project: project, author: user) } + let(:commit) { project.commit } + + describe '#perform' do + it 'does not process the commit when the project does not exist' do + expect(worker).not_to receive(:close_issues) + + worker.perform(-1, user.id, commit.id) + end + + it 'does not process the commit when the user does not exist' do + expect(worker).not_to receive(:close_issues) + + worker.perform(project.id, -1, commit.id) + end + + it 'does not process the commit when the commit no longer exists' do + expect(worker).not_to receive(:close_issues) + + worker.perform(project.id, user.id, 'this-should-does-not-exist') + end + + it 'processes the commit message' do + expect(worker).to receive(:process_commit_message).and_call_original + + worker.perform(project.id, user.id, commit.id) + end + + it 'updates the issue metrics' do + expect(worker).to receive(:update_issue_metrics).and_call_original + + worker.perform(project.id, user.id, commit.id) + end + end + + describe '#process_commit_message' do + context 'when pushing to the default branch' do + it 'closes issues that should be closed per the commit message' do + allow(commit).to receive(:safe_message). + and_return("Closes #{issue.to_reference}") + + expect(worker).to receive(:close_issues). + with(project, user, user, commit, [issue]) + + worker.process_commit_message(project, commit, user, user, true) + end + end + + context 'when pushing to a non-default branch' do + it 'does not close any issues' do + allow(commit).to receive(:safe_message). + and_return("Closes #{issue.to_reference}") + + expect(worker).not_to receive(:close_issues) + + worker.process_commit_message(project, commit, user, user, false) + end + end + + it 'creates cross references' do + expect(commit).to receive(:create_cross_references!) + + worker.process_commit_message(project, commit, user, user) + end + end + + describe '#close_issues' do + context 'when the user can update the issues' do + it 'closes the issues' do + worker.close_issues(project, user, user, commit, [issue]) + + issue.reload + + expect(issue.closed?).to eq(true) + end + end + + context 'when the user can not update the issues' do + it 'does not close the issues' do + other_user = create(:user) + + worker.close_issues(project, other_user, other_user, commit, [issue]) + + issue.reload + + expect(issue.closed?).to eq(false) + end + end + end + + describe '#update_issue_metrics' do + it 'updates any existing issue metrics' do + allow(commit).to receive(:safe_message). + and_return("Closes #{issue.to_reference}") + + worker.update_issue_metrics(commit, user) + + metric = Issue::Metrics.first + + expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date) + end + end +end -- cgit v1.2.1