blob: 02e9cd75a011701afd298e8c357a5eb1ffb2444e [file] [log] [blame]
philipel881829b2017-10-13 13:27:23 +02001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Sebastian Janssonb5374962018-02-07 13:26:38 +010011#include "modules/pacing/round_robin_packet_queue.h"
philipel881829b2017-10-13 13:27:23 +020012
13#include <algorithm>
Yves Gerey988cc082018-10-23 12:03:01 +020014#include <cstdint>
15#include <utility>
philipel881829b2017-10-13 13:27:23 +020016
17#include "rtc_base/checks.h"
philipel881829b2017-10-13 13:27:23 +020018
19namespace webrtc {
Erik Språng82d75a62019-08-09 22:44:47 +020020namespace {
21static constexpr DataSize kMaxLeadingSize = DataSize::Bytes<1400>();
22}
philipel881829b2017-10-13 13:27:23 +020023
Erik Språng58ee1872019-06-18 16:20:11 +020024RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
25 default;
26RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
Sebastian Jansson60570dc2018-09-13 17:11:06 +020027
Erik Språng58ee1872019-06-18 16:20:11 +020028RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
29 int priority,
Erik Språngf660e812019-09-01 12:26:44 +000030 RtpPacketToSend::Type type,
31 uint32_t ssrc,
32 uint16_t seq_number,
33 int64_t capture_time_ms,
Erik Språng82d75a62019-08-09 22:44:47 +020034 Timestamp enqueue_time,
35 DataSize size,
Erik Språng58ee1872019-06-18 16:20:11 +020036 bool retransmission,
37 uint64_t enqueue_order,
Erik Språng82d75a62019-08-09 22:44:47 +020038 std::multiset<Timestamp>::iterator enqueue_time_it,
Erik Språngf660e812019-09-01 12:26:44 +000039 absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
40 packet_it)
41 : type_(type),
42 priority_(priority),
43 ssrc_(ssrc),
44 sequence_number_(seq_number),
45 capture_time_ms_(capture_time_ms),
Erik Språng82d75a62019-08-09 22:44:47 +020046 enqueue_time_(enqueue_time),
47 size_(size),
Erik Språng58ee1872019-06-18 16:20:11 +020048 retransmission_(retransmission),
49 enqueue_order_(enqueue_order),
50 enqueue_time_it_(enqueue_time_it),
51 packet_it_(packet_it) {}
Sebastian Jansson60570dc2018-09-13 17:11:06 +020052
Erik Språng58ee1872019-06-18 16:20:11 +020053std::unique_ptr<RtpPacketToSend>
54RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
Erik Språngf660e812019-09-01 12:26:44 +000055 return packet_it_ ? std::move(**packet_it_) : nullptr;
Erik Språng58ee1872019-06-18 16:20:11 +020056}
Sebastian Jansson60570dc2018-09-13 17:11:06 +020057
Erik Språng82d75a62019-08-09 22:44:47 +020058void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
59 TimeDelta pause_time_sum) {
60 enqueue_time_ -= pause_time_sum;
Erik Språng58ee1872019-06-18 16:20:11 +020061}
Sebastian Jansson60570dc2018-09-13 17:11:06 +020062
Erik Språng58ee1872019-06-18 16:20:11 +020063bool RoundRobinPacketQueue::QueuedPacket::operator<(
64 const RoundRobinPacketQueue::QueuedPacket& other) const {
65 if (priority_ != other.priority_)
66 return priority_ > other.priority_;
67 if (retransmission_ != other.retransmission_)
68 return other.retransmission_;
69
70 return enqueue_order_ > other.enqueue_order_;
Sebastian Jansson60570dc2018-09-13 17:11:06 +020071}
72
Erik Språng82d75a62019-08-09 22:44:47 +020073RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
Mirko Bonadeib471c902018-07-18 14:11:27 +020074RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
Sebastian Janssonb5374962018-02-07 13:26:38 +010075RoundRobinPacketQueue::Stream::~Stream() {}
philipel881829b2017-10-13 13:27:23 +020076
Erik Språngf660e812019-09-01 12:26:44 +000077bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
78 if (!field_trials) {
79 return false;
80 }
81 return field_trials->Lookup(name).find("Enabled") == 0;
82}
83
Erik Språng7702c8a2019-07-30 22:36:02 +020084RoundRobinPacketQueue::RoundRobinPacketQueue(
Erik Språng82d75a62019-08-09 22:44:47 +020085 Timestamp start_time,
Erik Språng7702c8a2019-07-30 22:36:02 +020086 const WebRtcKeyValueConfig* field_trials)
Erik Språng82d75a62019-08-09 22:44:47 +020087 : time_last_updated_(start_time),
88 paused_(false),
89 size_packets_(0),
90 size_(DataSize::Zero()),
91 max_size_(kMaxLeadingSize),
92 queue_time_sum_(TimeDelta::Zero()),
Erik Språngf660e812019-09-01 12:26:44 +000093 pause_time_sum_(TimeDelta::Zero()),
94 send_side_bwe_with_overhead_(
95 IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
philipel881829b2017-10-13 13:27:23 +020096
Sebastian Janssonb5374962018-02-07 13:26:38 +010097RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
philipel881829b2017-10-13 13:27:23 +020098
Erik Språng58ee1872019-06-18 16:20:11 +020099void RoundRobinPacketQueue::Push(int priority,
Erik Språngf660e812019-09-01 12:26:44 +0000100 RtpPacketToSend::Type type,
101 uint32_t ssrc,
102 uint16_t seq_number,
103 int64_t capture_time_ms,
104 Timestamp enqueue_time,
105 DataSize size,
106 bool retransmission,
107 uint64_t enqueue_order) {
108 Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
109 enqueue_time, size, retransmission, enqueue_order,
110 enqueue_times_.insert(enqueue_time), absl::nullopt));
111}
112
113void RoundRobinPacketQueue::Push(int priority,
Erik Språng82d75a62019-08-09 22:44:47 +0200114 Timestamp enqueue_time,
Erik Språng58ee1872019-06-18 16:20:11 +0200115 uint64_t enqueue_order,
116 std::unique_ptr<RtpPacketToSend> packet) {
Erik Språngf660e812019-09-01 12:26:44 +0000117 uint32_t ssrc = packet->Ssrc();
118 uint16_t sequence_number = packet->SequenceNumber();
119 int64_t capture_time_ms = packet->capture_time_ms();
120 DataSize size =
121 DataSize::bytes(send_side_bwe_with_overhead_
122 ? packet->size()
123 : packet->payload_size() + packet->padding_size());
124 auto type = packet->packet_type();
125 RTC_DCHECK(type.has_value());
126
Erik Språng58ee1872019-06-18 16:20:11 +0200127 rtp_packets_.push_front(std::move(packet));
Erik Språngf660e812019-09-01 12:26:44 +0000128 Push(QueuedPacket(
129 priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
130 size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
131 enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
Erik Språng58ee1872019-06-18 16:20:11 +0200132}
133
Erik Språngf660e812019-09-01 12:26:44 +0000134RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
135 RTC_CHECK(!pop_packet_ && !pop_stream_);
philipelccdfcca2017-10-23 12:42:17 +0200136
137 Stream* stream = GetHighestPriorityStream();
Erik Språngf660e812019-09-01 12:26:44 +0000138 pop_stream_.emplace(stream);
139 pop_packet_.emplace(stream->packet_queue.top());
philipelccdfcca2017-10-23 12:42:17 +0200140 stream->packet_queue.pop();
141
Erik Språngf660e812019-09-01 12:26:44 +0000142 return &pop_packet_.value();
143}
philipel881829b2017-10-13 13:27:23 +0200144
Erik Språngf660e812019-09-01 12:26:44 +0000145void RoundRobinPacketQueue::CancelPop() {
146 RTC_CHECK(pop_packet_ && pop_stream_);
147 (*pop_stream_)->packet_queue.push(*pop_packet_);
148 pop_packet_.reset();
149 pop_stream_.reset();
150}
philipelccdfcca2017-10-23 12:42:17 +0200151
Erik Språngf660e812019-09-01 12:26:44 +0000152void RoundRobinPacketQueue::FinalizePop() {
153 if (!Empty()) {
154 RTC_CHECK(pop_packet_ && pop_stream_);
155 Stream* stream = *pop_stream_;
156 stream_priorities_.erase(stream->priority_it);
157 const QueuedPacket& packet = *pop_packet_;
philipel881829b2017-10-13 13:27:23 +0200158
Erik Språngf660e812019-09-01 12:26:44 +0000159 // Calculate the total amount of time spent by this packet in the queue
160 // while in a non-paused state. Note that the |pause_time_sum_ms_| was
161 // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
162 // by subtracting it now we effectively remove the time spent in in the
163 // queue while in a paused state.
164 TimeDelta time_in_non_paused_state =
165 time_last_updated_ - packet.enqueue_time() - pause_time_sum_;
166 queue_time_sum_ -= time_in_non_paused_state;
philipel881829b2017-10-13 13:27:23 +0200167
Erik Språngf660e812019-09-01 12:26:44 +0000168 RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
169 enqueue_times_.erase(packet.EnqueueTimeIterator());
Erik Språng58ee1872019-06-18 16:20:11 +0200170
Erik Språngf660e812019-09-01 12:26:44 +0000171 auto packet_it = packet.PacketIterator();
172 if (packet_it) {
173 rtp_packets_.erase(*packet_it);
174 }
philipel881829b2017-10-13 13:27:23 +0200175
Erik Språngf660e812019-09-01 12:26:44 +0000176 // Update |bytes| of this stream. The general idea is that the stream that
177 // has sent the least amount of bytes should have the highest priority.
178 // The problem with that is if streams send with different rates, in which
179 // case a "budget" will be built up for the stream sending at the lower
180 // rate. To avoid building a too large budget we limit |bytes| to be within
181 // kMaxLeading bytes of the stream that has sent the most amount of bytes.
182 stream->size =
183 std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize);
184 max_size_ = std::max(max_size_, stream->size);
philipel881829b2017-10-13 13:27:23 +0200185
Erik Språngf660e812019-09-01 12:26:44 +0000186 size_ -= packet.size();
187 size_packets_ -= 1;
188 RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
189
190 // If there are packets left to be sent, schedule the stream again.
191 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
192 if (stream->packet_queue.empty()) {
193 stream->priority_it = stream_priorities_.end();
194 } else {
195 int priority = stream->packet_queue.top().priority();
196 stream->priority_it = stream_priorities_.emplace(
197 StreamPrioKey(priority, stream->size), stream->ssrc);
198 }
199
200 pop_packet_.reset();
201 pop_stream_.reset();
philipel881829b2017-10-13 13:27:23 +0200202 }
203}
204
Sebastian Janssonb5374962018-02-07 13:26:38 +0100205bool RoundRobinPacketQueue::Empty() const {
philipel881829b2017-10-13 13:27:23 +0200206 RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
207 (stream_priorities_.empty() && size_packets_ == 0));
208 return stream_priorities_.empty();
209}
210
Sebastian Janssonb5374962018-02-07 13:26:38 +0100211size_t RoundRobinPacketQueue::SizeInPackets() const {
philipel881829b2017-10-13 13:27:23 +0200212 return size_packets_;
213}
214
Erik Språng82d75a62019-08-09 22:44:47 +0200215DataSize RoundRobinPacketQueue::Size() const {
216 return size_;
philipel881829b2017-10-13 13:27:23 +0200217}
218
Erik Språngeb487992019-11-14 14:15:15 +0100219bool RoundRobinPacketQueue::NextPacketIsAudio() const {
220 if (stream_priorities_.empty()) {
221 return false;
222 }
223 uint32_t ssrc = stream_priorities_.begin()->second;
224
225 auto stream_info_it = streams_.find(ssrc);
226 return stream_info_it->second.packet_queue.top().type() ==
227 RtpPacketToSend::Type::kAudio;
228}
229
Erik Språng82d75a62019-08-09 22:44:47 +0200230Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
philipel881829b2017-10-13 13:27:23 +0200231 if (Empty())
Erik Språng82d75a62019-08-09 22:44:47 +0200232 return Timestamp::MinusInfinity();
philipel881829b2017-10-13 13:27:23 +0200233 RTC_CHECK(!enqueue_times_.empty());
234 return *enqueue_times_.begin();
235}
236
Erik Språng82d75a62019-08-09 22:44:47 +0200237void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
238 RTC_CHECK_GE(now, time_last_updated_);
239 if (now == time_last_updated_)
philipel881829b2017-10-13 13:27:23 +0200240 return;
241
Erik Språng82d75a62019-08-09 22:44:47 +0200242 TimeDelta delta = now - time_last_updated_;
philipel881829b2017-10-13 13:27:23 +0200243
244 if (paused_) {
Erik Språng82d75a62019-08-09 22:44:47 +0200245 pause_time_sum_ += delta;
philipel881829b2017-10-13 13:27:23 +0200246 } else {
Erik Språng82d75a62019-08-09 22:44:47 +0200247 queue_time_sum_ += TimeDelta::us(delta.us() * size_packets_);
philipel881829b2017-10-13 13:27:23 +0200248 }
249
Erik Språng82d75a62019-08-09 22:44:47 +0200250 time_last_updated_ = now;
philipel881829b2017-10-13 13:27:23 +0200251}
252
Erik Språng82d75a62019-08-09 22:44:47 +0200253void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
philipel881829b2017-10-13 13:27:23 +0200254 if (paused_ == paused)
255 return;
Erik Språng82d75a62019-08-09 22:44:47 +0200256 UpdateQueueTime(now);
philipel881829b2017-10-13 13:27:23 +0200257 paused_ = paused;
258}
259
Erik Språng82d75a62019-08-09 22:44:47 +0200260TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
philipel881829b2017-10-13 13:27:23 +0200261 if (Empty())
Erik Språng82d75a62019-08-09 22:44:47 +0200262 return TimeDelta::Zero();
263 return queue_time_sum_ / size_packets_;
philipel881829b2017-10-13 13:27:23 +0200264}
265
Erik Språngf660e812019-09-01 12:26:44 +0000266void RoundRobinPacketQueue::Push(QueuedPacket packet) {
267 auto stream_info_it = streams_.find(packet.ssrc());
Erik Språng58ee1872019-06-18 16:20:11 +0200268 if (stream_info_it == streams_.end()) {
Erik Språngf660e812019-09-01 12:26:44 +0000269 stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
Erik Språng58ee1872019-06-18 16:20:11 +0200270 stream_info_it->second.priority_it = stream_priorities_.end();
Erik Språngf660e812019-09-01 12:26:44 +0000271 stream_info_it->second.ssrc = packet.ssrc();
Erik Språng58ee1872019-06-18 16:20:11 +0200272 }
273
274 Stream* stream = &stream_info_it->second;
275
276 if (stream->priority_it == stream_priorities_.end()) {
277 // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
278 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
279 stream->priority_it = stream_priorities_.emplace(
Erik Språngf660e812019-09-01 12:26:44 +0000280 StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
Erik Språng58ee1872019-06-18 16:20:11 +0200281 } else if (packet.priority() < stream->priority_it->first.priority) {
282 // If the priority of this SSRC increased, remove the outdated StreamPrioKey
283 // and insert a new one with the new priority. Note that |priority_| uses
284 // lower ordinal for higher priority.
285 stream_priorities_.erase(stream->priority_it);
286 stream->priority_it = stream_priorities_.emplace(
Erik Språngf660e812019-09-01 12:26:44 +0000287 StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
Erik Språng58ee1872019-06-18 16:20:11 +0200288 }
289 RTC_CHECK(stream->priority_it != stream_priorities_.end());
290
291 // In order to figure out how much time a packet has spent in the queue while
292 // not in a paused state, we subtract the total amount of time the queue has
293 // been paused so far, and when the packet is popped we subtract the total
294 // amount of time the queue has been paused at that moment. This way we
295 // subtract the total amount of time the packet has spent in the queue while
296 // in a paused state.
Erik Språng82d75a62019-08-09 22:44:47 +0200297 UpdateQueueTime(packet.enqueue_time());
298 packet.SubtractPauseTime(pause_time_sum_);
Erik Språng58ee1872019-06-18 16:20:11 +0200299
300 size_packets_ += 1;
Erik Språng82d75a62019-08-09 22:44:47 +0200301 size_ += packet.size();
Erik Språng58ee1872019-06-18 16:20:11 +0200302
303 stream->packet_queue.push(packet);
304}
305
Sebastian Janssonb5374962018-02-07 13:26:38 +0100306RoundRobinPacketQueue::Stream*
307RoundRobinPacketQueue::GetHighestPriorityStream() {
philipel881829b2017-10-13 13:27:23 +0200308 RTC_CHECK(!stream_priorities_.empty());
309 uint32_t ssrc = stream_priorities_.begin()->second;
310
311 auto stream_info_it = streams_.find(ssrc);
312 RTC_CHECK(stream_info_it != streams_.end());
313 RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
314 RTC_CHECK(!stream_info_it->second.packet_queue.empty());
315 return &stream_info_it->second;
316}
317
Sebastian Janssonb5374962018-02-07 13:26:38 +0100318bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
philipel881829b2017-10-13 13:27:23 +0200319 for (const auto& scheduled_stream : stream_priorities_) {
320 if (scheduled_stream.second == ssrc)
321 return true;
322 }
323 return false;
324}
325
326} // namespace webrtc