summaryrefslogtreecommitdiff
path: root/chromium/net/third_party/quiche/src/quic/test_tools/simulator/queue.cc
blob: 5113cf79791a27eb1ec0db09932c30cab51a8c15 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "quic/test_tools/simulator/queue.h"

#include "quic/platform/api/quic_logging.h"
#include "quic/test_tools/simulator/simulator.h"

namespace quic {
namespace simulator {

Queue::ListenerInterface::~ListenerInterface() {}

Queue::Queue(Simulator* simulator, std::string name, QuicByteCount capacity)
    : Actor(simulator, name),
      capacity_(capacity),
      bytes_queued_(0),
      aggregation_threshold_(0),
      aggregation_timeout_(QuicTime::Delta::Infinite()),
      current_bundle_(0),
      current_bundle_bytes_(0),
      tx_port_(nullptr),
      listener_(nullptr) {
  aggregation_timeout_alarm_.reset(simulator_->GetAlarmFactory()->CreateAlarm(
      new AggregationAlarmDelegate(this)));
}

Queue::~Queue() {}

void Queue::set_tx_port(ConstrainedPortInterface* port) {
  tx_port_ = port;
}

void Queue::AcceptPacket(std::unique_ptr<Packet> packet) {
  if (packet->size + bytes_queued_ > capacity_) {
    QUIC_DVLOG(1) << "Queue [" << name() << "] has received a packet from ["
                  << packet->source << "] to [" << packet->destination
                  << "] which is over capacity.  Dropping it.";
    QUIC_DVLOG(1) << "Queue size: " << bytes_queued_ << " out of " << capacity_
                  << ".  Packet size: " << packet->size;
    return;
  }

  bytes_queued_ += packet->size;
  queue_.emplace(std::move(packet), current_bundle_);

  if (IsAggregationEnabled()) {
    current_bundle_bytes_ += queue_.front().packet->size;
    if (!aggregation_timeout_alarm_->IsSet()) {
      aggregation_timeout_alarm_->Set(clock_->Now() + aggregation_timeout_);
    }
    if (current_bundle_bytes_ >= aggregation_threshold_) {
      NextBundle();
    }
  }

  ScheduleNextPacketDequeue();
}

void Queue::Act() {
  QUICHE_DCHECK(!queue_.empty());
  if (tx_port_->TimeUntilAvailable().IsZero()) {
    QUICHE_DCHECK(bytes_queued_ >= queue_.front().packet->size);
    bytes_queued_ -= queue_.front().packet->size;

    tx_port_->AcceptPacket(std::move(queue_.front().packet));
    queue_.pop();
    if (listener_ != nullptr) {
      listener_->OnPacketDequeued();
    }
  }

  ScheduleNextPacketDequeue();
}

void Queue::EnableAggregation(QuicByteCount aggregation_threshold,
                              QuicTime::Delta aggregation_timeout) {
  QUICHE_DCHECK_EQ(bytes_queued_, 0u);
  QUICHE_DCHECK_GT(aggregation_threshold, 0u);
  QUICHE_DCHECK(!aggregation_timeout.IsZero());
  QUICHE_DCHECK(!aggregation_timeout.IsInfinite());

  aggregation_threshold_ = aggregation_threshold;
  aggregation_timeout_ = aggregation_timeout;
}

Queue::AggregationAlarmDelegate::AggregationAlarmDelegate(Queue* queue)
    : queue_(queue) {}

void Queue::AggregationAlarmDelegate::OnAlarm() {
  queue_->NextBundle();
  queue_->ScheduleNextPacketDequeue();
}

Queue::EnqueuedPacket::EnqueuedPacket(std::unique_ptr<Packet> packet,
                                      AggregationBundleNumber bundle)
    : packet(std::move(packet)), bundle(bundle) {}

Queue::EnqueuedPacket::EnqueuedPacket(EnqueuedPacket&& other) = default;

Queue::EnqueuedPacket::~EnqueuedPacket() = default;

void Queue::NextBundle() {
  current_bundle_++;
  current_bundle_bytes_ = 0;
  aggregation_timeout_alarm_->Cancel();
}

void Queue::ScheduleNextPacketDequeue() {
  if (queue_.empty()) {
    QUICHE_DCHECK_EQ(bytes_queued_, 0u);
    return;
  }

  if (IsAggregationEnabled() && queue_.front().bundle == current_bundle_) {
    return;
  }

  QuicTime::Delta time_until_available = QuicTime::Delta::Zero();
  if (tx_port_) {
    time_until_available = tx_port_->TimeUntilAvailable();
  }

  Schedule(clock_->Now() + time_until_available);
}

}  // namespace simulator
}  // namespace quic