summaryrefslogtreecommitdiff
path: root/src/backend/port/unix_latch.c
blob: 02894976c06aff1281bb6ebaea7febd5c0ee5296 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *	  Routines for inter-process latches
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
 * however reliably interrupts the sleep, and causes select() to return
 * immediately even if the signal arrives before select() begins.
 *
 * (Actually, we prefer poll() over select() where available, but the
 * same comments apply to it.)
 *
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/port/unix_latch.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/shmem.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

/* Read and write ends of the self-pipe */
static int	selfpipe_readfd = -1;
static int	selfpipe_writefd = -1;

/* private function prototypes */
static void initSelfPipe(void);
static void drainSelfPipe(void);
static void sendSelfPipeByte(void);


/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
	/* Initialize the self-pipe if this is our first latch in the process */
	if (selfpipe_readfd == -1)
		initSelfPipe();

	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
 * is initially owned by no-one; use OwnLatch to associate it with the
 * current process.
 *
 * InitSharedLatch needs to be called in postmaster before forking child
 * processes, usually right after allocating the shared memory block
 * containing the latch with ShmemInitStruct. (The Unix implementation
 * doesn't actually require that, but the Windows one does.) Because of
 * this restriction, we have no concurrency issues to worry about here.
 */
void
InitSharedLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = 0;
	latch->is_shared = true;
}

/*
 * Associate a shared latch with the current process, allowing it to
 * wait on the latch.
 *
 * Although there is a sanity check for latch-already-owned, we don't do
 * any sort of locking here, meaning that we could fail to detect the error
 * if two processes try to own the same latch at about the same time.  If
 * there is any risk of that, caller must provide an interlock to prevent it.
 *
 * In any process that calls OwnLatch(), make sure that
 * latch_sigusr1_handler() is called from the SIGUSR1 signal handler,
 * as shared latches use SIGUSR1 for inter-process communication.
 */
void
OwnLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);

	/* Initialize the self-pipe if this is our first latch in this process */
	if (selfpipe_readfd == -1)
		initSelfPipe();

	/* sanity check */
	if (latch->owner_pid != 0)
		elog(ERROR, "latch already owned");

	latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);
	Assert(latch->owner_pid == MyProcPid);

	latch->owner_pid = 0;
}

/*
 * Wait for a given latch to be set, or for postmaster death, or until timeout
 * is exceeded. 'wakeEvents' is a bitmask that specifies which of those events
 * to wait for. If the latch is already set (and WL_LATCH_SET is given), the
 * function returns immediately.
 *
 * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
 * is given.  On some platforms, signals do not interrupt the wait, or even
 * cause the timeout to be restarted, so beware that the function can sleep
 * for several times longer than the requested timeout.  However, this
 * difficulty is not so great as it seems, because the signal handlers for any
 * signals that the caller should respond to ought to be programmed to end the
 * wait by calling SetLatch.  Ideally, the timeout parameter is vestigial.
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
 * Returns bit mask indicating which condition(s) caused the wake-up. Note
 * that if multiple wake-up conditions are true, there is no guarantee that
 * we return all of them in one call, but we will return at least one. Also,
 * according to the select(2) man page on Linux, select(2) may spuriously
 * return and report a file descriptor as readable, when it's not. We use
 * select(2), so WaitLatch can also spuriously claim that a socket is
 * readable, or postmaster has died, even when none of the wake conditions
 * have been satisfied. That should be rare in practice, but the caller
 * should not use the return value for anything critical, re-checking the
 * situation with PostmasterIsAlive() or read() on a socket as necessary.
 * The latch and timeout flag bits can be trusted, however.
 */
int
WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
{
	return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout);
}

/*
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
 */
int
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout)
{
	int			result = 0;
	int			rc;
#ifdef HAVE_POLL
	struct pollfd pfds[3];
	int			nfds;
#else
	struct timeval tv,
			   *tvp = NULL;
	fd_set		input_mask;
	fd_set		output_mask;
	int			hifd;
#endif

	/* Ignore WL_SOCKET_* events if no valid socket is given */
	if (sock == PGINVALID_SOCKET)
		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

	Assert(wakeEvents != 0);	/* must have at least one wake event */

	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
		elog(ERROR, "cannot wait on a latch owned by another process");

	/* Initialize timeout */
	if (wakeEvents & WL_TIMEOUT)
	{
		Assert(timeout >= 0);
#ifndef HAVE_POLL
		tv.tv_sec = timeout / 1000L;
		tv.tv_usec = (timeout % 1000L) * 1000L;
		tvp = &tv;
#endif
	}
	else
	{
#ifdef HAVE_POLL
		/* make sure poll() agrees there is no timeout */
		timeout = -1;
#endif
	}

	waiting = true;
	do
	{
		/*
		 * Clear the pipe, then check if the latch is set already. If someone
		 * sets the latch between this and the poll()/select() below, the
		 * setter will write a byte to the pipe (or signal us and the signal
		 * handler will do that), and the poll()/select() will return
		 * immediately.
		 *
		 * Note: we assume that the kernel calls involved in drainSelfPipe()
		 * and SetLatch() will provide adequate synchronization on machines
		 * with weak memory ordering, so that we cannot miss seeing is_set
		 * if the signal byte is already in the pipe when we drain it.
		 */
		drainSelfPipe();

		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
		{
			result |= WL_LATCH_SET;
			/*
			 * Leave loop immediately, avoid blocking again. We don't attempt
			 * to report any other events that might also be satisfied.
			 */
			break;
		}

		/* Must wait ... we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
		nfds = 0;
		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
		{
			/* socket, if used, is always in pfds[0] */
			pfds[0].fd = sock;
			pfds[0].events = 0;
			if (wakeEvents & WL_SOCKET_READABLE)
				pfds[0].events |= POLLIN;
			if (wakeEvents & WL_SOCKET_WRITEABLE)
				pfds[0].events |= POLLOUT;
			pfds[0].revents = 0;
			nfds++;
		}

		pfds[nfds].fd = selfpipe_readfd;
		pfds[nfds].events = POLLIN;
		pfds[nfds].revents = 0;
		nfds++;

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			/* postmaster fd, if used, is always in pfds[nfds - 1] */
			pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
			pfds[nfds].events = POLLIN;
			pfds[nfds].revents = 0;
			nfds++;
		}

		/* Sleep */
		rc = poll(pfds, nfds, (int) timeout);

		/* Check return code */
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
			waiting = false;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("poll() failed: %m")));
		}
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
		{
			/* timeout exceeded */
			result |= WL_TIMEOUT;
		}
		if ((wakeEvents & WL_SOCKET_READABLE) &&
			(pfds[0].revents & POLLIN))
		{
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
		}
		if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
			(pfds[0].revents & POLLOUT))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			(pfds[nfds - 1].revents & POLLIN))
		{
			result |= WL_POSTMASTER_DEATH;
		}

#else /* !HAVE_POLL */

		FD_ZERO(&input_mask);
		FD_ZERO(&output_mask);

		FD_SET(selfpipe_readfd, &input_mask);
		hifd = selfpipe_readfd;

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
			if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
				hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
		}

		if (wakeEvents & WL_SOCKET_READABLE)
		{
			FD_SET(sock, &input_mask);
			if (sock > hifd)
				hifd = sock;
		}

		if (wakeEvents & WL_SOCKET_WRITEABLE)
		{
			FD_SET(sock, &output_mask);
			if (sock > hifd)
				hifd = sock;
		}

		/* Sleep */
		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);

		/* Check return code */
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
			waiting = false;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("select() failed: %m")));
		}
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
		{
			/* timeout exceeded */
			result |= WL_TIMEOUT;
		}
		if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
		{
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
		}
		if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			 FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
		{
			result |= WL_POSTMASTER_DEATH;
		}
#endif /* HAVE_POLL */
	} while (result == 0);
	waiting = false;

	return result;
}

/*
 * Sets a latch and wakes up anyone waiting on it.
 *
 * This is cheap if the latch is already set, otherwise not so much.
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.  (That's standard practice in most signal handlers, of
 * course, but we used to omit it in handlers that only set a flag.)
 */
void
SetLatch(volatile Latch *latch)
{
	pid_t		owner_pid;

	/*
	 * XXX there really ought to be a memory barrier operation right here,
	 * to ensure that any flag variables we might have changed get flushed
	 * to main memory before we check/set is_set.  Without that, we have to
	 * require that callers provide their own synchronization for machines
	 * with weak memory ordering (see latch.h).
	 */

	/* Quick exit if already set */
	if (latch->is_set)
		return;

	latch->is_set = true;

	/*
	 * See if anyone's waiting for the latch. It can be the current process if
	 * we're in a signal handler. We use the self-pipe to wake up the select()
	 * in that case. If it's another process, send a signal.
	 *
	 * Fetch owner_pid only once, in case the latch is concurrently getting
	 * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
	 * guaranteed to be true! In practice, the effective range of pid_t fits
	 * in a 32 bit integer, and so should be atomic. In the worst case, we
	 * might end up signaling the wrong process. Even then, you're very
	 * unlucky if a process with that bogus pid exists and belongs to
	 * Postgres; and PG database processes should handle excess SIGUSR1
	 * interrupts without a problem anyhow.
	 *
	 * Another sort of race condition that's possible here is for a new process
	 * to own the latch immediately after we look, so we don't signal it.
	 * This is okay so long as all callers of ResetLatch/WaitLatch follow the
	 * standard coding convention of waiting at the bottom of their loops,
	 * not the top, so that they'll correctly process latch-setting events that
	 * happen before they enter the loop.
	 */
	owner_pid = latch->owner_pid;
	if (owner_pid == 0)
		return;
	else if (owner_pid == MyProcPid)
	{
		if (waiting)
			sendSelfPipeByte();
	}
	else
		kill(owner_pid, SIGUSR1);
}

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
	/* Only the owner should reset the latch */
	Assert(latch->owner_pid == MyProcPid);

	latch->is_set = false;

	/*
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that the write to is_set gets flushed to main memory before we
	 * examine any flag variables.  Otherwise a concurrent SetLatch might
	 * falsely conclude that it needn't signal us, even though we have missed
	 * seeing some flag updates that SetLatch was supposed to inform us of.
	 * For the moment, callers must supply their own synchronization of flag
	 * variables (see latch.h).
	 */
}

/*
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
 *
 * Wake up WaitLatch, if we're waiting.  (We might not be, since SIGUSR1 is
 * overloaded for multiple purposes; or we might not have reached WaitLatch
 * yet, in which case we don't need to fill the pipe either.)
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.
 */
void
latch_sigusr1_handler(void)
{
	if (waiting)
		sendSelfPipeByte();
}

/* initialize the self-pipe */
static void
initSelfPipe(void)
{
	int			pipefd[2];

	/*
	 * Set up the self-pipe that allows a signal handler to wake up the
	 * select() in WaitLatch. Make the write-end non-blocking, so that
	 * SetLatch won't block if the event has already been set many times
	 * filling the kernel buffer. Make the read-end non-blocking too, so that
	 * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
	 */
	if (pipe(pipefd) < 0)
		elog(FATAL, "pipe() failed: %m");
	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

	selfpipe_readfd = pipefd[0];
	selfpipe_writefd = pipefd[1];
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
	int			rc;
	char		dummy = 0;

retry:
	rc = write(selfpipe_writefd, &dummy, 1);
	if (rc < 0)
	{
		/* If interrupted by signal, just retry */
		if (errno == EINTR)
			goto retry;

		/*
		 * If the pipe is full, we don't need to retry, the data that's there
		 * already is enough to wake up WaitLatch.
		 */
		if (errno == EAGAIN || errno == EWOULDBLOCK)
			return;

		/*
		 * Oops, the write() failed for some other reason. We might be in a
		 * signal handler, so it's not safe to elog(). We have no choice but
		 * silently ignore the error.
		 */
		return;
	}
}

/*
 * Read all available data from the self-pipe
 *
 * Note: this is only called when waiting = true.  If it fails and doesn't
 * return, it must reset that flag first (though ideally, this will never
 * happen).
 */
static void
drainSelfPipe(void)
{
	/*
	 * There shouldn't normally be more than one byte in the pipe, or maybe a
	 * few bytes if multiple processes run SetLatch at the same instant.
	 */
	char		buf[16];
	int			rc;

	for (;;)
	{
		rc = read(selfpipe_readfd, buf, sizeof(buf));
		if (rc < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
				break;			/* the pipe is empty */
			else if (errno == EINTR)
				continue;		/* retry */
			else
			{
				waiting = false;
				elog(ERROR, "read() on self-pipe failed: %m");
			}
		}
		else if (rc == 0)
		{
			waiting = false;
			elog(ERROR, "unexpected EOF on self-pipe");
		}
		else if (rc < sizeof(buf))
		{
			/* we successfully drained the pipe; no need to read() again */
			break;
		}
		/* else buffer wasn't big enough, so read again */
	}
}