1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quic/test_tools/simulator/queue.h"
#include "quic/platform/api/quic_logging.h"
#include "quic/test_tools/simulator/simulator.h"
namespace quic {
namespace simulator {
Queue::ListenerInterface::~ListenerInterface() {}
Queue::Queue(Simulator* simulator, std::string name, QuicByteCount capacity)
: Actor(simulator, name),
capacity_(capacity),
bytes_queued_(0),
aggregation_threshold_(0),
aggregation_timeout_(QuicTime::Delta::Infinite()),
current_bundle_(0),
current_bundle_bytes_(0),
tx_port_(nullptr),
listener_(nullptr) {
aggregation_timeout_alarm_.reset(simulator_->GetAlarmFactory()->CreateAlarm(
new AggregationAlarmDelegate(this)));
}
Queue::~Queue() {}
void Queue::set_tx_port(ConstrainedPortInterface* port) {
tx_port_ = port;
}
void Queue::AcceptPacket(std::unique_ptr<Packet> packet) {
if (packet->size + bytes_queued_ > capacity_) {
QUIC_DVLOG(1) << "Queue [" << name() << "] has received a packet from ["
<< packet->source << "] to [" << packet->destination
<< "] which is over capacity. Dropping it.";
QUIC_DVLOG(1) << "Queue size: " << bytes_queued_ << " out of " << capacity_
<< ". Packet size: " << packet->size;
return;
}
bytes_queued_ += packet->size;
queue_.emplace(std::move(packet), current_bundle_);
if (IsAggregationEnabled()) {
current_bundle_bytes_ += queue_.front().packet->size;
if (!aggregation_timeout_alarm_->IsSet()) {
aggregation_timeout_alarm_->Set(clock_->Now() + aggregation_timeout_);
}
if (current_bundle_bytes_ >= aggregation_threshold_) {
NextBundle();
}
}
ScheduleNextPacketDequeue();
}
void Queue::Act() {
QUICHE_DCHECK(!queue_.empty());
if (tx_port_->TimeUntilAvailable().IsZero()) {
QUICHE_DCHECK(bytes_queued_ >= queue_.front().packet->size);
bytes_queued_ -= queue_.front().packet->size;
tx_port_->AcceptPacket(std::move(queue_.front().packet));
queue_.pop();
if (listener_ != nullptr) {
listener_->OnPacketDequeued();
}
}
ScheduleNextPacketDequeue();
}
void Queue::EnableAggregation(QuicByteCount aggregation_threshold,
QuicTime::Delta aggregation_timeout) {
QUICHE_DCHECK_EQ(bytes_queued_, 0u);
QUICHE_DCHECK_GT(aggregation_threshold, 0u);
QUICHE_DCHECK(!aggregation_timeout.IsZero());
QUICHE_DCHECK(!aggregation_timeout.IsInfinite());
aggregation_threshold_ = aggregation_threshold;
aggregation_timeout_ = aggregation_timeout;
}
Queue::AggregationAlarmDelegate::AggregationAlarmDelegate(Queue* queue)
: queue_(queue) {}
void Queue::AggregationAlarmDelegate::OnAlarm() {
queue_->NextBundle();
queue_->ScheduleNextPacketDequeue();
}
Queue::EnqueuedPacket::EnqueuedPacket(std::unique_ptr<Packet> packet,
AggregationBundleNumber bundle)
: packet(std::move(packet)), bundle(bundle) {}
Queue::EnqueuedPacket::EnqueuedPacket(EnqueuedPacket&& other) = default;
Queue::EnqueuedPacket::~EnqueuedPacket() = default;
void Queue::NextBundle() {
current_bundle_++;
current_bundle_bytes_ = 0;
aggregation_timeout_alarm_->Cancel();
}
void Queue::ScheduleNextPacketDequeue() {
if (queue_.empty()) {
QUICHE_DCHECK_EQ(bytes_queued_, 0u);
return;
}
if (IsAggregationEnabled() && queue_.front().bundle == current_bundle_) {
return;
}
QuicTime::Delta time_until_available = QuicTime::Delta::Zero();
if (tx_port_) {
time_until_available = tx_port_->TimeUntilAvailable();
}
Schedule(clock_->Now() + time_until_available);
}
} // namespace simulator
} // namespace quic
|