summaryrefslogtreecommitdiff
path: root/app/services/ci/register_job_service.rb
blob: 90341b26fd659b9867091c59176844d75b8e3fe7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# frozen_string_literal: true

module Ci
  # This class responsible for assigning
  # proper pending build to runner on runner API request
  class RegisterJobService
    attr_reader :runner, :metrics

    TEMPORARY_LOCK_TIMEOUT = 3.seconds

    Result = Struct.new(:build, :build_json, :valid?)

    ##
    # The queue depth limit number has been determined by observing 95
    # percentile of effective queue depth on gitlab.com. This is only likely to
    # affect 5% of the worst case scenarios.
    MAX_QUEUE_DEPTH = 45

    def initialize(runner)
      @runner = runner
      @metrics = ::Gitlab::Ci::Queue::Metrics.new(runner)
    end

    def execute(params = {})
      @metrics.increment_queue_operation(:queue_attempt)

      @metrics.observe_queue_time(:process, @runner.runner_type) do
        process_queue(params)
      end
    end

    private

    def process_queue(params)
      valid = true
      depth = 0

      each_build(params) do |build|
        depth += 1
        @metrics.increment_queue_operation(:queue_iteration)

        if depth > max_queue_depth
          @metrics.increment_queue_operation(:queue_depth_limit)

          valid = false

          break
        end

        # We read builds from replicas
        # It is likely that some other concurrent connection is processing
        # a given build at a given moment. To avoid an expensive compute
        # we perform an exclusive lease on Redis to acquire a build temporarily
        unless acquire_temporary_lock(build.id)
          @metrics.increment_queue_operation(:build_temporary_locked)

          # We failed to acquire lock
          # - our queue is not complete as some resources are locked temporarily
          # - we need to re-process it again to ensure that all builds are handled
          valid = false

          next
        end

        result = process_build(build, params)
        next unless result

        if result.valid?
          @metrics.register_success(result.build)
          @metrics.observe_queue_depth(:found, depth)

          return result # rubocop:disable Cop/AvoidReturnFromBlocks
        else
          # The usage of valid: is described in
          # handling of ActiveRecord::StaleObjectError
          valid = false
        end
      end

      @metrics.increment_queue_operation(:queue_conflict) unless valid
      @metrics.observe_queue_depth(:conflict, depth) unless valid
      @metrics.observe_queue_depth(:not_found, depth) if valid
      @metrics.register_failure

      Result.new(nil, nil, valid)
    end

    # rubocop: disable CodeReuse/ActiveRecord
    def each_build(params, &blk)
      builds =
        if runner.instance_type?
          builds_for_shared_runner
        elsif runner.group_type?
          builds_for_group_runner
        else
          builds_for_project_runner
        end

      # pick builds that does not have other tags than runner's one
      builds = builds.matches_tag_ids(runner.tags.ids)

      # pick builds that have at least one tag
      unless runner.run_untagged?
        builds = builds.with_any_tags
      end

      # pick builds that older than specified age
      if params.key?(:job_age)
        builds = builds.queued_before(params[:job_age].seconds.ago)
      end

      if Feature.enabled?(:ci_register_job_service_one_by_one, runner, default_enabled: true)
        build_ids = retrieve_queue(-> { builds.pluck(:id) })

        @metrics.observe_queue_size(-> { build_ids.size }, @runner.runner_type)

        build_ids.each do |build_id|
          yield Ci::Build.find(build_id)
        end
      else
        builds_array = retrieve_queue(-> { builds.to_a })

        @metrics.observe_queue_size(-> { builds_array.size }, @runner.runner_type)

        builds_array.each(&blk)
      end
    end
    # rubocop: enable CodeReuse/ActiveRecord

    def retrieve_queue(queue_query_proc)
      @metrics.observe_queue_time(:retrieve, @runner.runner_type) do
        queue_query_proc.call
      end
    end

    def process_build(build, params)
      unless build.pending?
        @metrics.increment_queue_operation(:build_not_pending)
        return
      end

      if runner.can_pick?(build)
        @metrics.increment_queue_operation(:build_can_pick)
      else
        @metrics.increment_queue_operation(:build_not_pick)

        return
      end

      # In case when 2 runners try to assign the same build, second runner will be declined
      # with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method.
      if assign_runner!(build, params)
        present_build!(build)
      end
    rescue ActiveRecord::StaleObjectError
      # We are looping to find another build that is not conflicting
      # It also indicates that this build can be picked and passed to runner.
      # If we don't do it, basically a bunch of runners would be competing for a build
      # and thus we will generate a lot of 409. This will increase
      # the number of generated requests, also will reduce significantly
      # how many builds can be picked by runner in a unit of time.
      # In case we hit the concurrency-access lock,
      # we still have to return 409 in the end,
      # to make sure that this is properly handled by runner.
      @metrics.increment_queue_operation(:build_conflict_lock)

      Result.new(nil, nil, false)
    rescue StateMachines::InvalidTransition
      @metrics.increment_queue_operation(:build_conflict_transition)

      Result.new(nil, nil, false)
    rescue => ex
      @metrics.increment_queue_operation(:build_conflict_exception)

      # If an error (e.g. GRPC::DeadlineExceeded) occurred constructing
      # the result, consider this as a failure to be retried.
      scheduler_failure!(build)
      track_exception_for_build(ex, build)

      # skip, and move to next one
      nil
    end

    def max_queue_depth
      @max_queue_depth ||= begin
        if Feature.enabled?(:gitlab_ci_builds_queue_limit, runner, default_enabled: true)
          MAX_QUEUE_DEPTH
        else
          ::Gitlab::Database::MAX_INT_VALUE
        end
      end
    end

    # Force variables evaluation to occur now
    def present_build!(build)
      # We need to use the presenter here because Gitaly calls in the presenter
      # may fail, and we need to ensure the response has been generated.
      presented_build = ::Ci::BuildRunnerPresenter.new(build) # rubocop:disable CodeReuse/Presenter
      build_json = ::API::Entities::JobRequest::Response.new(presented_build).to_json
      Result.new(build, build_json, true)
    end

    def assign_runner!(build, params)
      build.runner_id = runner.id
      build.runner_session_attributes = params[:session] if params[:session].present?

      failure_reason, _ = pre_assign_runner_checks.find { |_, check| check.call(build, params) }

      if failure_reason
        @metrics.increment_queue_operation(:runner_pre_assign_checks_failed)

        build.drop!(failure_reason)
      else
        @metrics.increment_queue_operation(:runner_pre_assign_checks_success)

        build.run!
      end

      !failure_reason
    end

    def acquire_temporary_lock(build_id)
      return true unless Feature.enabled?(:ci_register_job_temporary_lock, runner)

      key = "build/register/#{build_id}"

      Gitlab::ExclusiveLease
        .new(key, timeout: TEMPORARY_LOCK_TIMEOUT.to_i)
        .try_obtain
    end

    def scheduler_failure!(build)
      Gitlab::OptimisticLocking.retry_lock(build, 3, name: 'register_job_scheduler_failure') do |subject|
        subject.drop!(:scheduler_failure)
      end
    rescue => ex
      build.doom!

      # This requires extra exception, otherwise we would loose information
      # why we cannot perform `scheduler_failure`
      track_exception_for_build(ex, build)
    end

    def track_exception_for_build(ex, build)
      Gitlab::ErrorTracking.track_exception(ex,
        build_id: build.id,
        build_name: build.name,
        build_stage: build.stage,
        pipeline_id: build.pipeline_id,
        project_id: build.project_id
      )
    end

    # rubocop: disable CodeReuse/ActiveRecord
    def builds_for_shared_runner
      new_builds.
        # don't run projects which have not enabled shared runners and builds
        joins(:project).where(projects: { shared_runners_enabled: true, pending_delete: false })
        .joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
        .where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0').

      # Implement fair scheduling
      # this returns builds that are ordered by number of running builds
      # we prefer projects that don't use shared runners at all
      joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
        .order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
    end
    # rubocop: enable CodeReuse/ActiveRecord

    # rubocop: disable CodeReuse/ActiveRecord
    def builds_for_project_runner
      new_builds.where(project: runner.projects.without_deleted.with_builds_enabled).order('id ASC')
    end
    # rubocop: enable CodeReuse/ActiveRecord

    # rubocop: disable CodeReuse/ActiveRecord
    def builds_for_group_runner
      # Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
      groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)

      hierarchy_groups = Gitlab::ObjectHierarchy.new(groups, options: { use_distinct: Feature.enabled?(:use_distinct_in_register_job_object_hierarchy) }).base_and_descendants
      projects = Project.where(namespace_id: hierarchy_groups)
        .with_group_runners_enabled
        .with_builds_enabled
        .without_deleted
      new_builds.where(project: projects).order('id ASC')
    end
    # rubocop: enable CodeReuse/ActiveRecord

    # rubocop: disable CodeReuse/ActiveRecord
    def running_builds_for_shared_runners
      Ci::Build.running.where(runner: Ci::Runner.instance_type)
        .group(:project_id).select(:project_id, 'count(*) AS running_builds')
    end
    # rubocop: enable CodeReuse/ActiveRecord

    def new_builds
      builds = Ci::Build.pending.unstarted
      builds = builds.ref_protected if runner.ref_protected?
      builds
    end

    def pre_assign_runner_checks
      {
        missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? },
        runner_unsupported: -> (build, params) { !build.supported_runner?(params.dig(:info, :features)) },
        archived_failure: -> (build, _) { build.archived? }
      }
    end
  end
end

Ci::RegisterJobService.prepend_if_ee('EE::Ci::RegisterJobService')