blob: ad4ae5d64b1ac96b1d69ea9ce778df373307b5c2 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import os
from sys import platform
import pytest
from kazoo.testing import KazooTestCase
class KazooInterruptTests(KazooTestCase):
def test_interrupted_systemcall(self):
"""
Make sure interrupted system calls don't break the world, since we
can't control what all signals our connection thread will get
"""
if "linux" not in platform:
pytest.skip(
"Unable to reproduce error case on non-linux platforms"
)
path = "interrupt_test"
value = b"1"
self.client.create(path, value)
# set the euid to the current process' euid.
# glibc sends SIGRT to all children, which will interrupt the
# system call
os.seteuid(os.geteuid())
# basic sanity test that it worked alright
assert self.client.get(path)[0] == value
|