blob: e131755a04a3b0b82f718495805a2823097b9d0c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import os
from nose import SkipTest
from sys import platform
from kazoo.testing import KazooTestCase
class KazooInterruptTests(KazooTestCase):
def test_interrupted_systemcall(self):
'''
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
'''
if 'linux' not in platform:
raise SkipTest('Unable to reproduce error case on'
' non-linux platforms')
path = 'interrupt_test'
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children, which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
|