blob: 765e43fcf6789da5b1bd18522e54481fba98dd93 [file] [log] [blame]
philipel9981bd92017-09-26 17:16:06 +02001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "modules/pacing/packet_queue.h"
12
13#include <algorithm>
14#include <list>
15#include <vector>
16
17#include "modules/include/module_common_types.h"
18#include "modules/pacing/alr_detector.h"
19#include "modules/pacing/bitrate_prober.h"
20#include "modules/pacing/interval_budget.h"
21#include "modules/utility/include/process_thread.h"
22#include "rtc_base/checks.h"
23#include "rtc_base/logging.h"
24#include "system_wrappers/include/clock.h"
25#include "system_wrappers/include/field_trial.h"
26
27namespace webrtc {
28
29PacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
30 uint32_t ssrc,
31 uint16_t seq_number,
32 int64_t capture_time_ms,
33 int64_t enqueue_time_ms,
34 size_t length_in_bytes,
35 bool retransmission,
36 uint64_t enqueue_order)
37 : priority(priority),
38 ssrc(ssrc),
39 sequence_number(seq_number),
40 capture_time_ms(capture_time_ms),
41 enqueue_time_ms(enqueue_time_ms),
42 sum_paused_ms(0),
43 bytes(length_in_bytes),
44 retransmission(retransmission),
45 enqueue_order(enqueue_order) {}
46
47PacketQueue::Packet::~Packet() {}
48
49PacketQueue::PacketQueue(const Clock* clock)
50 : bytes_(0),
51 clock_(clock),
52 queue_time_sum_(0),
53 time_last_updated_(clock_->TimeInMilliseconds()),
54 paused_(false) {}
55
56PacketQueue::~PacketQueue() {}
57
58void PacketQueue::Push(const Packet& packet) {
59 if (!AddToDupeSet(packet))
60 return;
61
62 UpdateQueueTime(packet.enqueue_time_ms);
63
64 // Store packet in list, use pointers in priority queue for cheaper moves.
65 // Packets have a handle to its own iterator in the list, for easy removal
66 // when popping from queue.
67 packet_list_.push_front(packet);
68 std::list<Packet>::iterator it = packet_list_.begin();
69 it->this_it = it; // Handle for direct removal from list.
70 prio_queue_.push(&(*it)); // Pointer into list.
71 bytes_ += packet.bytes;
72}
73
74const PacketQueue::Packet& PacketQueue::BeginPop() {
75 const PacketQueue::Packet& packet = *prio_queue_.top();
76 prio_queue_.pop();
77 return packet;
78}
79
80void PacketQueue::CancelPop(const PacketQueue::Packet& packet) {
81 prio_queue_.push(&(*packet.this_it));
82}
83
84void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) {
85 RemoveFromDupeSet(packet);
86 bytes_ -= packet.bytes;
87 int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
88 RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
89 packet_queue_time_ms -= packet.sum_paused_ms;
90 RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
91 queue_time_sum_ -= packet_queue_time_ms;
92 packet_list_.erase(packet.this_it);
93 RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
94 if (packet_list_.empty())
95 RTC_DCHECK_EQ(0, queue_time_sum_);
96}
97
98bool PacketQueue::Empty() const {
99 return prio_queue_.empty();
100}
101
102size_t PacketQueue::SizeInPackets() const {
103 return prio_queue_.size();
104}
105
106uint64_t PacketQueue::SizeInBytes() const {
107 return bytes_;
108}
109
110int64_t PacketQueue::OldestEnqueueTimeMs() const {
111 auto it = packet_list_.rbegin();
112 if (it == packet_list_.rend())
113 return 0;
114 return it->enqueue_time_ms;
115}
116
117void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
118 RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
119 if (timestamp_ms == time_last_updated_)
120 return;
121
122 int64_t delta_ms = timestamp_ms - time_last_updated_;
123
124 if (paused_) {
125 // Increase per-packet accumulators of time spent in queue while paused,
126 // so that we can disregard that when subtracting main accumulator when
127 // popping packet from the queue.
128 for (auto& it : packet_list_) {
129 it.sum_paused_ms += delta_ms;
130 }
131 } else {
132 // Use packet packet_list_.size() not prio_queue_.size() here, as there
133 // might be an outstanding element popped from prio_queue_ currently in
134 // the SendPacket() call, while packet_list_ will always be correct.
135 queue_time_sum_ += delta_ms * packet_list_.size();
136 }
137 time_last_updated_ = timestamp_ms;
138}
139
140void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
141 if (paused_ == paused)
142 return;
143 UpdateQueueTime(timestamp_ms);
144 paused_ = paused;
145}
146
147int64_t PacketQueue::AverageQueueTimeMs() const {
148 if (prio_queue_.empty())
149 return 0;
150 return queue_time_sum_ / packet_list_.size();
151}
152
153bool PacketQueue::AddToDupeSet(const PacketQueue::Packet& packet) {
154 SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
155 if (it == dupe_map_.end()) {
156 // First for this ssrc, just insert.
157 dupe_map_[packet.ssrc].insert(packet.sequence_number);
158 return true;
159 }
160
161 // Insert returns a pair, where second is a bool set to true if new element.
162 return it->second.insert(packet.sequence_number).second;
163}
164
165void PacketQueue::RemoveFromDupeSet(const PacketQueue::Packet& packet) {
166 SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
167 RTC_DCHECK(it != dupe_map_.end());
168 it->second.erase(packet.sequence_number);
169 if (it->second.empty()) {
170 dupe_map_.erase(it);
171 }
172}
173
174} // namespace webrtc