summaryrefslogtreecommitdiff
path: root/chromium/testing/xvfb_unittest.py
blob: 8abfbad2720ff68f2491cbaa2461afcc0312094f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unit tests for xvfb.py functionality.

Each unit test is launching xvfb_test_script.py
through xvfb.py as a subprocess, then tests its expected output.
"""

import os
import signal
import subprocess
import sys
import time
import unittest

# pylint: disable=super-with-arguments


TEST_FILE = __file__.replace('.pyc', '.py')
XVFB = TEST_FILE.replace('_unittest', '')
XVFB_TEST_SCRIPT = TEST_FILE.replace('_unittest', '_test_script')


def launch_process(args):
  """Launches a sub process to run through xvfb.py."""
  return subprocess.Popen(
      [XVFB, XVFB_TEST_SCRIPT] + args, stdout=subprocess.PIPE,
      stderr=subprocess.STDOUT, env=os.environ.copy())


# pylint: disable=inconsistent-return-statements
def read_subprocess_message(proc, starts_with):
  """Finds the value after first line prefix condition."""
  for line in proc.stdout.read().decode('utf-8').splitlines(True):
    if str(line).startswith(starts_with):
      return line.rstrip().replace(starts_with, '')
# pylint: enable=inconsistent-return-statements


def send_signal(proc, sig, sleep_time=0.3):
  """Sends a signal to subprocess."""
  time.sleep(sleep_time)  # gives process time to launch.
  os.kill(proc.pid, sig)
  proc.wait()


class XvfbLinuxTest(unittest.TestCase):

  def setUp(self):
    super(XvfbLinuxTest, self).setUp()
    if not sys.platform.startswith('linux'):
      self.skipTest('linux only test')
    self._procs = []

  def test_no_xvfb_display(self):
    self._procs.append(launch_process(['--no-xvfb']))
    self._procs[0].wait()
    display = read_subprocess_message(self._procs[0], 'Display :')
    self.assertEqual(display, os.environ.get('DISPLAY', 'None'))

  def test_xvfb_display(self):
    self._procs.append(launch_process([]))
    self._procs[0].wait()
    display = read_subprocess_message(self._procs[0], 'Display :')
    self.assertIsNotNone(display) # Openbox likely failed to open DISPLAY
    self.assertNotEqual(display, os.environ.get('DISPLAY', 'None'))

  def test_no_xvfb_flag(self):
    self._procs.append(launch_process(['--no-xvfb']))
    self._procs[0].wait()

  def test_xvfb_flag(self):
    self._procs.append(launch_process([]))
    self._procs[0].wait()

  def test_xvfb_race_condition(self):
    self._procs = [launch_process([]) for _ in range(15)]
    for proc in self._procs:
      proc.wait()
    display_list = [read_subprocess_message(p, 'Display :')
                    for p in self._procs]
    for display in display_list:
      self.assertIsNotNone(display) # Openbox likely failed to open DISPLAY
      self.assertNotEqual(display, os.environ.get('DISPLAY', 'None'))

  def tearDown(self):
    super(XvfbLinuxTest, self).tearDown()
    for proc in self._procs:
      if proc.stdout:
        proc.stdout.close()



class XvfbTest(unittest.TestCase):

  def setUp(self):
    super(XvfbTest, self).setUp()
    if sys.platform == 'win32':
      self.skipTest('non-win32 test')
    self._proc = None


  def test_send_sigint(self):
    self._proc = launch_process(['--sleep'])
    send_signal(self._proc, signal.SIGINT, 1)
    sig = read_subprocess_message(self._proc, 'Signal :')
    self.assertIsNotNone(sig) # OpenBox likely failed to start
    self.assertEqual(int(sig), int(signal.SIGINT))

  def test_send_sigterm(self):
    self._proc = launch_process(['--sleep'])
    send_signal(self._proc, signal.SIGTERM, 1)
    sig = read_subprocess_message(self._proc, 'Signal :')
    self.assertIsNotNone(sig) # OpenBox likely failed to start
    self.assertEqual(int(sig), int(signal.SIGTERM))

  def tearDown(self):
    super(XvfbTest, self).tearDown()
    if self._proc.stdout:
      self._proc.stdout.close()

if __name__ == '__main__':
  unittest.main()