summaryrefslogtreecommitdiff
path: root/src/test/subscription/t/004_sync.pl
blob: aa7714c533b0cc2c6486e62d5973dae788f5bef8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178

# Copyright (c) 2021-2023, PostgreSQL Global Development Group

# Tests for logical replication table syncing
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf('postgresql.conf',
	"wal_retrieve_retry_interval = 1ms");
$node_subscriber->start;

# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
	"CREATE TABLE tab_rep (a int primary key)");
$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_rep SELECT generate_series(1,10)");

# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
	"CREATE TABLE tab_rep (a int primary key)");

# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tap_pub FOR ALL TABLES");

$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');

my $result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
is($result, qq(10), 'initial data synced for first sub');

# drop subscription so that there is unreplicated data
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");

$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_rep SELECT generate_series(11,20)");

# recreate the subscription, it will try to do initial copy
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

# but it will be stuck on data copy as it will fail on constraint
my $started_query = "SELECT srsubstate = 'd' FROM pg_subscription_rel;";
$node_subscriber->poll_query_until('postgres', $started_query)
  or die "Timed out while waiting for subscriber to start sync";

# remove the conflicting data
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");

# wait for sync to finish this time
$node_subscriber->wait_for_subscription_sync;

# check that all data is synced
$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
is($result, qq(20), 'initial data synced for second sub');

# now check another subscription for the same node pair
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)"
);

# wait for it to start
$node_subscriber->poll_query_until('postgres',
	"SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
) or die "Timed out while waiting for subscriber to start";

# and drop both subscriptions
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");

# check subscriptions are removed
$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*) FROM pg_subscription");
is($result, qq(0), 'second and third sub are dropped');

# remove the conflicting data
$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");

# recreate the subscription again
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

# and wait for data sync to finish again
$node_subscriber->wait_for_subscription_sync;

# check that all data is synced
$result =
  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
is($result, qq(20), 'initial data synced for fourth sub');

# add new table on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)");

# setup structure with existing data on publisher
$node_publisher->safe_psql('postgres',
	"CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)");

$node_publisher->wait_for_catchup('tap_sub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*) FROM tab_rep_next");
is($result, qq(0), 'no data for table added after subscription initialized');

# ask for data sync
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");

# wait for sync to finish
$node_subscriber->wait_for_subscription_sync;

$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*) FROM tab_rep_next");
is($result, qq(10),
	'data for table added after subscription initialized are now synced');

# Add some data
$node_publisher->safe_psql('postgres',
	"INSERT INTO tab_rep_next SELECT generate_series(1,10)");

$node_publisher->wait_for_catchup('tap_sub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT count(*) FROM tab_rep_next");
is($result, qq(20),
	'changes for table added after subscription initialized replicated');

# clean up
$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next");
$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next");
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");

# Table tab_rep already has the same records on both publisher and subscriber
# at this time. Recreate the subscription which will do the initial copy of
# the table again and fails due to unique constraint violation.
$node_subscriber->safe_psql('postgres',
	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);

$result = $node_subscriber->poll_query_until('postgres', $started_query)
  or die "Timed out while waiting for subscriber to start sync";

# DROP SUBSCRIPTION must clean up slots on the publisher side when the
# subscriber is stuck on data copy for constraint violation.
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");

# When DROP SUBSCRIPTION tries to drop the tablesync slot, the slot may not
# have been created, which causes the slot to be created after the DROP
# SUSCRIPTION finishes. Such slots eventually get dropped at walsender exit
# time. So, to prevent being affected by such ephemeral tablesync slots, we
# wait until all the slots have been cleaned.
ok( $node_publisher->poll_query_until(
		'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
	'DROP SUBSCRIPTION during error can clean up the slots on the publisher');

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();