summaryrefslogtreecommitdiff
path: root/src/backend/utils/sort/sharedtuplestore.c
blob: 996cef07d4fe1bcbaa8ab720df4fe30758a6c92f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
/*-------------------------------------------------------------------------
 *
 * sharedtuplestore.c
 *	  Simple mechanism for sharing tuples between backends.
 *
 * This module contains a shared temporary tuple storage mechanism providing
 * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
 * can write to a SharedTuplestore, and then multiple backends can later scan
 * the stored tuples.  Currently, the only scan type supported is a parallel
 * scan where each backend reads an arbitrary subset of the tuples that were
 * written.
 *
 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/utils/sort/sharedtuplestore.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/htup.h"
#include "access/htup_details.h"
#include "miscadmin.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
#include "storage/sharedfileset.h"
#include "utils/sharedtuplestore.h"

/*
 * The size of chunks, in pages.  This is somewhat arbitrarily set to match
 * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
 * at approximately the same rate as it allocates new chunks of memory to
 * insert them into.
 */
#define STS_CHUNK_PAGES 4
#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)

/* Chunk written to disk. */
typedef struct SharedTuplestoreChunk
{
	int			ntuples;		/* Number of tuples in this chunk. */
	int			overflow;		/* If overflow, how many including this one? */
	char		data[FLEXIBLE_ARRAY_MEMBER];
} SharedTuplestoreChunk;

/* Per-participant shared state. */
typedef struct SharedTuplestoreParticipant
{
	LWLock		lock;
	BlockNumber read_page;		/* Page number for next read. */
	BlockNumber npages;			/* Number of pages written. */
	bool		writing;		/* Used only for assertions. */
} SharedTuplestoreParticipant;

/* The control object that lives in shared memory. */
struct SharedTuplestore
{
	int			nparticipants;	/* Number of participants that can write. */
	int			flags;			/* Flag bits from SHARED_TUPLESTORE_XXX */
	size_t		meta_data_size; /* Size of per-tuple header. */
	char		name[NAMEDATALEN];	/* A name for this tuplestore. */

	/* Followed by per-participant shared state. */
	SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
};

/* Per-participant state that lives in backend-local memory. */
struct SharedTuplestoreAccessor
{
	int			participant;	/* My participant number. */
	SharedTuplestore *sts;		/* The shared state. */
	SharedFileSet *fileset;		/* The SharedFileSet holding files. */
	MemoryContext context;		/* Memory context for buffers. */

	/* State for reading. */
	int			read_participant;	/* The current participant to read from. */
	BufFile    *read_file;		/* The current file to read from. */
	int			read_ntuples_available; /* The number of tuples in chunk. */
	int			read_ntuples;	/* How many tuples have we read from chunk? */
	size_t		read_bytes;		/* How many bytes have we read from chunk? */
	char	   *read_buffer;	/* A buffer for loading tuples. */
	size_t		read_buffer_size;
	BlockNumber read_next_page; /* Lowest block we'll consider reading. */

	/* State for writing. */
	SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
	BufFile    *write_file;		/* The current file to write to. */
	BlockNumber write_page;		/* The next page to write to. */
	char	   *write_pointer;	/* Current write pointer within chunk. */
	char	   *write_end;		/* One past the end of the current chunk. */
};

static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
						 int participant);

/*
 * Return the amount of shared memory required to hold SharedTuplestore for a
 * given number of participants.
 */
size_t
sts_estimate(int participants)
{
	return offsetof(SharedTuplestore, participants) +
		sizeof(SharedTuplestoreParticipant) * participants;
}

/*
 * Initialize a SharedTuplestore in existing shared memory.  There must be
 * space for sts_estimate(participants) bytes.  If flags includes the value
 * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
 * eagerly (but this isn't yet implemented).
 *
 * Tuples that are stored may optionally carry a piece of fixed sized
 * meta-data which will be retrieved along with the tuple.  This is useful for
 * the hash values used in multi-batch hash joins, but could have other
 * applications.
 *
 * The caller must supply a SharedFileSet, which is essentially a directory
 * that will be cleaned up automatically, and a name which must be unique
 * across all SharedTuplestores created in the same SharedFileSet.
 */
SharedTuplestoreAccessor *
sts_initialize(SharedTuplestore *sts, int participants,
			   int my_participant_number,
			   size_t meta_data_size,
			   int flags,
			   SharedFileSet *fileset,
			   const char *name)
{
	SharedTuplestoreAccessor *accessor;
	int			i;

	Assert(my_participant_number < participants);

	sts->nparticipants = participants;
	sts->meta_data_size = meta_data_size;
	sts->flags = flags;

	if (strlen(name) > sizeof(sts->name) - 1)
		elog(ERROR, "SharedTuplestore name too long");
	strcpy(sts->name, name);

	/*
	 * Limit meta-data so it + tuple size always fits into a single chunk.
	 * sts_puttuple() and sts_read_tuple() could be made to support scenarios
	 * where that's not the case, but it's not currently required. If so,
	 * meta-data size probably should be made variable, too.
	 */
	if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
		elog(ERROR, "meta-data too long");

	for (i = 0; i < participants; ++i)
	{
		LWLockInitialize(&sts->participants[i].lock,
						 LWTRANCHE_SHARED_TUPLESTORE);
		sts->participants[i].read_page = 0;
		sts->participants[i].writing = false;
	}

	accessor = palloc0(sizeof(SharedTuplestoreAccessor));
	accessor->participant = my_participant_number;
	accessor->sts = sts;
	accessor->fileset = fileset;
	accessor->context = CurrentMemoryContext;

	return accessor;
}

/*
 * Attach to a SharedTuplestore that has been initialized by another backend,
 * so that this backend can read and write tuples.
 */
SharedTuplestoreAccessor *
sts_attach(SharedTuplestore *sts,
		   int my_participant_number,
		   SharedFileSet *fileset)
{
	SharedTuplestoreAccessor *accessor;

	Assert(my_participant_number < sts->nparticipants);

	accessor = palloc0(sizeof(SharedTuplestoreAccessor));
	accessor->participant = my_participant_number;
	accessor->sts = sts;
	accessor->fileset = fileset;
	accessor->context = CurrentMemoryContext;

	return accessor;
}

static void
sts_flush_chunk(SharedTuplestoreAccessor *accessor)
{
	size_t		size;

	size = STS_CHUNK_PAGES * BLCKSZ;
	BufFileWrite(accessor->write_file, accessor->write_chunk, size);
	memset(accessor->write_chunk, 0, size);
	accessor->write_pointer = &accessor->write_chunk->data[0];
	accessor->sts->participants[accessor->participant].npages +=
		STS_CHUNK_PAGES;
}

/*
 * Finish writing tuples.  This must be called by all backends that have
 * written data before any backend begins reading it.
 */
void
sts_end_write(SharedTuplestoreAccessor *accessor)
{
	if (accessor->write_file != NULL)
	{
		sts_flush_chunk(accessor);
		BufFileClose(accessor->write_file);
		pfree(accessor->write_chunk);
		accessor->write_chunk = NULL;
		accessor->write_file = NULL;
		accessor->sts->participants[accessor->participant].writing = false;
	}
}

/*
 * Prepare to rescan.  Only one participant must call this.  After it returns,
 * all participants may call sts_begin_parallel_scan() and then loop over
 * sts_parallel_scan_next().  This function must not be called concurrently
 * with a scan, and synchronization to avoid that is the caller's
 * responsibility.
 */
void
sts_reinitialize(SharedTuplestoreAccessor *accessor)
{
	int			i;

	/*
	 * Reset the shared read head for all participants' files.  Also set the
	 * initial chunk size to the minimum (any increases from that size will be
	 * recorded in chunk_expansion_log).
	 */
	for (i = 0; i < accessor->sts->nparticipants; ++i)
	{
		accessor->sts->participants[i].read_page = 0;
	}
}

/*
 * Begin scanning the contents in parallel.
 */
void
sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
{
	int			i PG_USED_FOR_ASSERTS_ONLY;

	/* End any existing scan that was in progress. */
	sts_end_parallel_scan(accessor);

	/*
	 * Any backend that might have written into this shared tuplestore must
	 * have called sts_end_write(), so that all buffers are flushed and the
	 * files have stopped growing.
	 */
	for (i = 0; i < accessor->sts->nparticipants; ++i)
		Assert(!accessor->sts->participants[i].writing);

	/*
	 * We will start out reading the file that THIS backend wrote.  There may
	 * be some caching locality advantage to that.
	 */
	accessor->read_participant = accessor->participant;
	accessor->read_file = NULL;
	accessor->read_next_page = 0;
}

/*
 * Finish a parallel scan, freeing associated backend-local resources.
 */
void
sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
{
	/*
	 * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
	 * we'd probably need a reference count of current parallel scanners so we
	 * could safely do it only when the reference count reaches zero.
	 */
	if (accessor->read_file != NULL)
	{
		BufFileClose(accessor->read_file);
		accessor->read_file = NULL;
	}
}

/*
 * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
 * pointer to meta data of that size must be provided.
 */
void
sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
			 MinimalTuple tuple)
{
	size_t		size;

	/* Do we have our own file yet? */
	if (accessor->write_file == NULL)
	{
		SharedTuplestoreParticipant *participant;
		char		name[MAXPGPATH];

		/* Create one.  Only this backend will write into it. */
		sts_filename(name, accessor, accessor->participant);
		accessor->write_file =
			BufFileCreateFileSet(&accessor->fileset->fs, name);

		/* Set up the shared state for this backend's file. */
		participant = &accessor->sts->participants[accessor->participant];
		participant->writing = true;	/* for assertions only */
	}

	/* Do we have space? */
	size = accessor->sts->meta_data_size + tuple->t_len;
	if (accessor->write_pointer + size >= accessor->write_end)
	{
		if (accessor->write_chunk == NULL)
		{
			/* First time through.  Allocate chunk. */
			accessor->write_chunk = (SharedTuplestoreChunk *)
				MemoryContextAllocZero(accessor->context,
									   STS_CHUNK_PAGES * BLCKSZ);
			accessor->write_chunk->ntuples = 0;
			accessor->write_pointer = &accessor->write_chunk->data[0];
			accessor->write_end = (char *)
				accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
		}
		else
		{
			/* See if flushing helps. */
			sts_flush_chunk(accessor);
		}

		/* It may still not be enough in the case of a gigantic tuple. */
		if (accessor->write_pointer + size >= accessor->write_end)
		{
			size_t		written;

			/*
			 * We'll write the beginning of the oversized tuple, and then
			 * write the rest in some number of 'overflow' chunks.
			 *
			 * sts_initialize() verifies that the size of the tuple +
			 * meta-data always fits into a chunk. Because the chunk has been
			 * flushed above, we can be sure to have all of a chunk's usable
			 * space available.
			 */
			Assert(accessor->write_pointer + accessor->sts->meta_data_size +
				   sizeof(uint32) < accessor->write_end);

			/* Write the meta-data as one chunk. */
			if (accessor->sts->meta_data_size > 0)
				memcpy(accessor->write_pointer, meta_data,
					   accessor->sts->meta_data_size);

			/*
			 * Write as much of the tuple as we can fit. This includes the
			 * tuple's size at the start.
			 */
			written = accessor->write_end - accessor->write_pointer -
				accessor->sts->meta_data_size;
			memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
				   tuple, written);
			++accessor->write_chunk->ntuples;
			size -= accessor->sts->meta_data_size;
			size -= written;
			/* Now write as many overflow chunks as we need for the rest. */
			while (size > 0)
			{
				size_t		written_this_chunk;

				sts_flush_chunk(accessor);

				/*
				 * How many overflow chunks to go?  This will allow readers to
				 * skip all of them at once instead of reading each one.
				 */
				accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
					STS_CHUNK_DATA_SIZE;
				written_this_chunk =
					Min(accessor->write_end - accessor->write_pointer, size);
				memcpy(accessor->write_pointer, (char *) tuple + written,
					   written_this_chunk);
				accessor->write_pointer += written_this_chunk;
				size -= written_this_chunk;
				written += written_this_chunk;
			}
			return;
		}
	}

	/* Copy meta-data and tuple into buffer. */
	if (accessor->sts->meta_data_size > 0)
		memcpy(accessor->write_pointer, meta_data,
			   accessor->sts->meta_data_size);
	memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
		   tuple->t_len);
	accessor->write_pointer += size;
	++accessor->write_chunk->ntuples;
}

static MinimalTuple
sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
{
	MinimalTuple tuple;
	uint32		size;
	size_t		remaining_size;
	size_t		this_chunk_size;
	char	   *destination;

	/*
	 * We'll keep track of bytes read from this chunk so that we can detect an
	 * overflowing tuple and switch to reading overflow pages.
	 */
	if (accessor->sts->meta_data_size > 0)
	{
		if (BufFileRead(accessor->read_file,
						meta_data,
						accessor->sts->meta_data_size) !=
			accessor->sts->meta_data_size)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not read from shared tuplestore temporary file"),
					 errdetail_internal("Short read while reading meta-data.")));
		accessor->read_bytes += accessor->sts->meta_data_size;
	}
	if (BufFileRead(accessor->read_file,
					&size,
					sizeof(size)) != sizeof(size))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not read from shared tuplestore temporary file"),
				 errdetail_internal("Short read while reading size.")));
	accessor->read_bytes += sizeof(size);
	if (size > accessor->read_buffer_size)
	{
		size_t		new_read_buffer_size;

		if (accessor->read_buffer != NULL)
			pfree(accessor->read_buffer);
		new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
		accessor->read_buffer =
			MemoryContextAlloc(accessor->context, new_read_buffer_size);
		accessor->read_buffer_size = new_read_buffer_size;
	}
	remaining_size = size - sizeof(uint32);
	this_chunk_size = Min(remaining_size,
						  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
	destination = accessor->read_buffer + sizeof(uint32);
	if (BufFileRead(accessor->read_file,
					destination,
					this_chunk_size) != this_chunk_size)
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("could not read from shared tuplestore temporary file"),
				 errdetail_internal("Short read while reading tuple.")));
	accessor->read_bytes += this_chunk_size;
	remaining_size -= this_chunk_size;
	destination += this_chunk_size;
	++accessor->read_ntuples;

	/* Check if we need to read any overflow chunks. */
	while (remaining_size > 0)
	{
		/* We are now positioned at the start of an overflow chunk. */
		SharedTuplestoreChunk chunk_header;

		if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
			STS_CHUNK_HEADER_SIZE)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not read from shared tuplestore temporary file"),
					 errdetail_internal("Short read while reading overflow chunk header.")));
		accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
		if (chunk_header.overflow == 0)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("unexpected chunk in shared tuplestore temporary file"),
					 errdetail_internal("Expected overflow chunk.")));
		accessor->read_next_page += STS_CHUNK_PAGES;
		this_chunk_size = Min(remaining_size,
							  BLCKSZ * STS_CHUNK_PAGES -
							  STS_CHUNK_HEADER_SIZE);
		if (BufFileRead(accessor->read_file,
						destination,
						this_chunk_size) != this_chunk_size)
			ereport(ERROR,
					(errcode_for_file_access(),
					 errmsg("could not read from shared tuplestore temporary file"),
					 errdetail_internal("Short read while reading tuple.")));
		accessor->read_bytes += this_chunk_size;
		remaining_size -= this_chunk_size;
		destination += this_chunk_size;

		/*
		 * These will be used to count regular tuples following the oversized
		 * tuple that spilled into this overflow chunk.
		 */
		accessor->read_ntuples = 0;
		accessor->read_ntuples_available = chunk_header.ntuples;
	}

	tuple = (MinimalTuple) accessor->read_buffer;
	tuple->t_len = size;

	return tuple;
}

/*
 * Get the next tuple in the current parallel scan.
 */
MinimalTuple
sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
{
	SharedTuplestoreParticipant *p;
	BlockNumber read_page;
	bool		eof;

	for (;;)
	{
		/* Can we read more tuples from the current chunk? */
		if (accessor->read_ntuples < accessor->read_ntuples_available)
			return sts_read_tuple(accessor, meta_data);

		/* Find the location of a new chunk to read. */
		p = &accessor->sts->participants[accessor->read_participant];

		LWLockAcquire(&p->lock, LW_EXCLUSIVE);
		/* We can skip directly past overflow pages we know about. */
		if (p->read_page < accessor->read_next_page)
			p->read_page = accessor->read_next_page;
		eof = p->read_page >= p->npages;
		if (!eof)
		{
			/* Claim the next chunk. */
			read_page = p->read_page;
			/* Advance the read head for the next reader. */
			p->read_page += STS_CHUNK_PAGES;
			accessor->read_next_page = p->read_page;
		}
		LWLockRelease(&p->lock);

		if (!eof)
		{
			SharedTuplestoreChunk chunk_header;
			size_t		nread;

			/* Make sure we have the file open. */
			if (accessor->read_file == NULL)
			{
				char		name[MAXPGPATH];

				sts_filename(name, accessor, accessor->read_participant);
				accessor->read_file =
					BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
									   false);
			}

			/* Seek and load the chunk header. */
			if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not seek to block %u in shared tuplestore temporary file",
								read_page)));
			nread = BufFileRead(accessor->read_file, &chunk_header,
								STS_CHUNK_HEADER_SIZE);
			if (nread != STS_CHUNK_HEADER_SIZE)
				ereport(ERROR,
						(errcode_for_file_access(),
						 errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
								nread, STS_CHUNK_HEADER_SIZE)));

			/*
			 * If this is an overflow chunk, we skip it and any following
			 * overflow chunks all at once.
			 */
			if (chunk_header.overflow > 0)
			{
				accessor->read_next_page = read_page +
					chunk_header.overflow * STS_CHUNK_PAGES;
				continue;
			}

			accessor->read_ntuples = 0;
			accessor->read_ntuples_available = chunk_header.ntuples;
			accessor->read_bytes = STS_CHUNK_HEADER_SIZE;

			/* Go around again, so we can get a tuple from this chunk. */
		}
		else
		{
			if (accessor->read_file != NULL)
			{
				BufFileClose(accessor->read_file);
				accessor->read_file = NULL;
			}

			/*
			 * Try the next participant's file.  If we've gone full circle,
			 * we're done.
			 */
			accessor->read_participant = (accessor->read_participant + 1) %
				accessor->sts->nparticipants;
			if (accessor->read_participant == accessor->participant)
				break;
			accessor->read_next_page = 0;

			/* Go around again, so we can get a chunk from this file. */
		}
	}

	return NULL;
}

/*
 * Create the name used for the BufFile that a given participant will write.
 */
static void
sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
{
	snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
}