1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# frozen_string_literal: true
# rubocop: disable all
class PostgresSetting < ActiveRecord::Base
self.table_name = 'pg_settings'
def self.autovacuum_interval
select('setting::bigint AS interval')
.where(name: 'autovacuum_naptime')
.take
.interval
end
end
class PostgresTableStatistic < ActiveRecord::Base
self.table_name = 'pg_stat_all_tables'
# This threshold is based on what GitLab.com can usually replicate in a matter
# of seconds.
MAX_DEAD_TUPLES = 100_000
alias_attribute :dead_tuples, :n_dead_tup
alias_attribute :live_tuples, :n_live_tup
def self.for_table(table)
find_by(relname: table)
end
def too_many_dead_tuples?
# If there are no live tuples (e.g. the statistics are out of date), the
# below calculation would return true, even if there are only a few dead
# tuples.
return false if live_tuples.zero?
# For small tables we want to limit the number of dead tuples to 1% of the
# total number of tuples.
#
# For large tables this can result in a very large number (millions for
# example), so we enforce an upper limit on the number of dead tuples.
return true if dead_tuples >= MAX_DEAD_TUPLES
(dead_tuples.to_f / live_tuples.to_f) >= 0.01
end
end
class Table
def initialize(name)
@name = name
end
def too_many_dead_tuples?
# Table statistics are only refreshed when running ANALYZE. To ensure this
# method doesn't return the wrong value, we first run ANALYZE before
# checking the statistics.
analyze
# We force the use of a transaction here so the query always goes to the
# primary, even when using the EE DB load balancer.
stats = PostgresTableStatistic.transaction do
PostgresTableStatistic.for_table(@name)
end
stats.too_many_dead_tuples?
end
def analyze
ActiveRecord::Base.connection.execute("ANALYZE #{quoted_table_name}")
end
def vacuum_analyze
ActiveRecord::Base.connection.execute("VACUUM ANALYZE #{quoted_table_name}")
end
def quoted_table_name
ActiveRecord::Base.connection.quote_table_name(@name)
end
end
class ReplicationSlot < ActiveRecord::Base
self.table_name = 'pg_replication_slots'
LAG_FUNCTION =
if Gitlab::Database.postgresql? && Gitlab::Database.version.to_f < 10.0
'pg_xlog_location_diff(pg_current_xlog_insert_location(), restart_lsn)::bigint'
else
'pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)::bigint'
end
MAX_LAG = 100.megabytes
def self.lag_too_great?
sizes = transaction { pluck(LAG_FUNCTION) }
too_great = sizes.count { |size| size >= MAX_LAG }
# If too many replicas are falling behind too much, the availability of a
# GitLab instance might suffer. To prevent this from happening we require at
# least 1 replica to have data recent enough.
(sizes.length - too_great) >= 1
end
end
Event.send(:include, EachBatch)
def parallel_migrate(query, batch_size: 1_000, concurrency: 8)
old_config = ActiveRecord::Base.configurations[Rails.env]
new_config = old_config.merge('pool' => concurrency)
ActiveRecord::Base.establish_connection(new_config)
begin
query.each_batch(of: batch_size).each_slice(concurrency) do |slices|
threads = slices.map do |slice|
Thread.new do
yield slice.first
end
end
threads.each(&:join)
table = Table.new(query.table_name)
start = Gitlab::Metrics::System.monotonic_time
while ReplicationSlot.lag_too_great? || table.too_many_dead_tuples?
sleep(5)
# If vacuuming hasn't kicked in yet we'll manually vacuum the table,
# ensuring we don't wait in this loop for too long.
#
# We use `nap time * 2` to make sure we don't start vacuuming right away
# when autovacuuming is waking up.
if (Gitlab::Metrics::System.monotonic_time - start) >= (PostgresSetting.autovacuum_interval * 2)
puts 'MANUALLY VACUUMING'
table.vacuum_analyze
end
end
end
ensure
ActiveRecord::Base.establish_connection(old_config)
end
end
parallel_migrate(Event, concurrency: 16) do |batch|
batch.update_all('updated_at = now()')
end
|