1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# This file is part of cloud-init. See LICENSE file for license information.
import time
from contextlib import suppress
from unittest.mock import PropertyMock
import pytest
import responses
from cloudinit.reporting import flush_events
from cloudinit.reporting.events import report_start_event
from cloudinit.reporting.handlers import WebHookHandler
class TestWebHookHandler:
@pytest.fixture(autouse=True)
def setup(self, mocker):
handler = WebHookHandler(endpoint="http://localhost")
m_registered_items = mocker.patch(
"cloudinit.registry.DictRegistry.registered_items",
new_callable=PropertyMock,
)
m_registered_items.return_value = {"webhook": handler}
@responses.activate
def test_webhook_handler(self, caplog):
"""Test the happy path."""
responses.add(responses.POST, "http://localhost", status=200)
report_start_event("name", "description")
flush_events()
assert 1 == caplog.text.count(
"Read from http://localhost (200, 0b) after 1 attempts"
)
@responses.activate
def test_404(self, caplog):
"""Test failure"""
responses.add(responses.POST, "http://localhost", status=404)
report_start_event("name", "description")
flush_events()
assert 1 == caplog.text.count("Failed posting event")
@responses.activate
def test_background_processing(self, caplog):
"""Test that processing happens in background.
In the non-flush case, ensure that the event is still posted.
Since the event is posted in the background, wait while looping.
"""
responses.add(responses.POST, "http://localhost", status=200)
report_start_event("name", "description")
start_time = time.time()
while time.time() - start_time < 3:
with suppress(AssertionError):
assert (
"Read from http://localhost (200, 0b) after 1 attempts"
in caplog.text
)
break
else:
pytest.fail("Never got expected log message")
@responses.activate
@pytest.mark.parametrize(
"num_failures,expected_log_count,expected_cancel",
[(2, 2, False), (3, 3, True), (50, 3, True)],
)
def test_failures_cancel_flush(
self, caplog, num_failures, expected_log_count, expected_cancel
):
"""Test that too many failures will cancel further processing on flush.
2 messages should not cancel on flush
3 or more should cancel on flush
The number of received messages will be based on how many have
been processed before the flush was initiated.
"""
responses.add(responses.POST, "http://localhost", status=404)
for _ in range(num_failures):
report_start_event("name", "description")
flush_events()
# Force a context switch. Without this, it's possible that the
# expected log message hasn't made it to the log file yet
time.sleep(0.01)
# If we've pushed a bunch of messages, any number could have been
# processed before we get to the flush.
assert (
expected_log_count
<= caplog.text.count("Failed posting event")
<= num_failures
)
cancelled_message = (
"Multiple consecutive failures in WebHookHandler. "
"Cancelling all queued events"
)
if expected_cancel:
assert cancelled_message in caplog.text
else:
assert cancelled_message not in caplog.text
@responses.activate
def test_multiple_failures_no_flush(self, caplog):
"""Test we don't cancel posting if flush hasn't been requested.
Since processing happens in the background, wait in a loop
for all messages to be posted
"""
responses.add(responses.POST, "http://localhost", status=404)
for _ in range(10):
report_start_event("name", "description")
start_time = time.time()
while time.time() - start_time < 3:
with suppress(AssertionError):
assert 10 == caplog.text.count("Failed posting event")
break
time.sleep(0.01) # Force context switch
else:
pytest.fail(
"Expected 20 failures, only got "
f"{caplog.text.count('Failed posting event')}"
)
|