summaryrefslogtreecommitdiff
path: root/src/bin/pg_rewind/local_source.c
blob: 9bd43cba7481659726c23e4ee6c1497973526a11 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/*-------------------------------------------------------------------------
 *
 * local_source.c
 *	  Functions for using a local data directory as the source.
 *
 * Portions Copyright (c) 2013-2023, PostgreSQL Global Development Group
 *
 *-------------------------------------------------------------------------
 */
#include "postgres_fe.h"

#include <fcntl.h>
#include <unistd.h>

#include "datapagemap.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
#include "rewind_source.h"

typedef struct
{
	rewind_source common;		/* common interface functions */

	const char *datadir;		/* path to the source data directory */
} local_source;

static void local_traverse_files(rewind_source *source,
								 process_file_callback_t callback);
static char *local_fetch_file(rewind_source *source, const char *path,
							  size_t *filesize);
static void local_queue_fetch_file(rewind_source *source, const char *path,
								   size_t len);
static void local_queue_fetch_range(rewind_source *source, const char *path,
									off_t off, size_t len);
static void local_finish_fetch(rewind_source *source);
static void local_destroy(rewind_source *source);

rewind_source *
init_local_source(const char *datadir)
{
	local_source *src;

	src = pg_malloc0(sizeof(local_source));

	src->common.traverse_files = local_traverse_files;
	src->common.fetch_file = local_fetch_file;
	src->common.queue_fetch_file = local_queue_fetch_file;
	src->common.queue_fetch_range = local_queue_fetch_range;
	src->common.finish_fetch = local_finish_fetch;
	src->common.get_current_wal_insert_lsn = NULL;
	src->common.destroy = local_destroy;

	src->datadir = datadir;

	return &src->common;
}

static void
local_traverse_files(rewind_source *source, process_file_callback_t callback)
{
	traverse_datadir(((local_source *) source)->datadir, callback);
}

static char *
local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
{
	return slurpFile(((local_source *) source)->datadir, path, filesize);
}

/*
 * Copy a file from source to target.
 *
 * 'len' is the expected length of the file.
 */
static void
local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
{
	const char *datadir = ((local_source *) source)->datadir;
	PGIOAlignedBlock buf;
	char		srcpath[MAXPGPATH];
	int			srcfd;
	size_t		written_len;

	snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);

	/* Open source file for reading */
	srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
	if (srcfd < 0)
		pg_fatal("could not open source file \"%s\": %m",
				 srcpath);

	/* Truncate and open the target file for writing */
	open_target_file(path, true);

	written_len = 0;
	for (;;)
	{
		ssize_t		read_len;

		read_len = read(srcfd, buf.data, sizeof(buf));

		if (read_len < 0)
			pg_fatal("could not read file \"%s\": %m", srcpath);
		else if (read_len == 0)
			break;				/* EOF reached */

		write_target_range(buf.data, written_len, read_len);
		written_len += read_len;
	}

	/*
	 * A local source is not expected to change while we're rewinding, so
	 * check that the size of the file matches our earlier expectation.
	 */
	if (written_len != len)
		pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
				 srcpath, (int) len, (int) written_len);

	if (close(srcfd) != 0)
		pg_fatal("could not close file \"%s\": %m", srcpath);
}

/*
 * Copy a file from source to target, starting at 'off', for 'len' bytes.
 */
static void
local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
						size_t len)
{
	const char *datadir = ((local_source *) source)->datadir;
	PGIOAlignedBlock buf;
	char		srcpath[MAXPGPATH];
	int			srcfd;
	off_t		begin = off;
	off_t		end = off + len;

	snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);

	srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
	if (srcfd < 0)
		pg_fatal("could not open source file \"%s\": %m",
				 srcpath);

	if (lseek(srcfd, begin, SEEK_SET) == -1)
		pg_fatal("could not seek in source file: %m");

	open_target_file(path, false);

	while (end - begin > 0)
	{
		ssize_t		readlen;
		size_t		thislen;

		if (end - begin > sizeof(buf))
			thislen = sizeof(buf);
		else
			thislen = end - begin;

		readlen = read(srcfd, buf.data, thislen);

		if (readlen < 0)
			pg_fatal("could not read file \"%s\": %m", srcpath);
		else if (readlen == 0)
			pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);

		write_target_range(buf.data, begin, readlen);
		begin += readlen;
	}

	if (close(srcfd) != 0)
		pg_fatal("could not close file \"%s\": %m", srcpath);
}

static void
local_finish_fetch(rewind_source *source)
{
	/*
	 * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
	 */
}

static void
local_destroy(rewind_source *source)
{
	pfree(source);
}