blob: 0989b45966740ea9ee686ee97ab9685c20bba1e2 [file] [log] [blame]
philipel881829b2017-10-13 13:27:23 +02001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Sebastian Janssonb5374962018-02-07 13:26:38 +010011#include "modules/pacing/round_robin_packet_queue.h"
philipel881829b2017-10-13 13:27:23 +020012
13#include <algorithm>
14
15#include "rtc_base/checks.h"
16#include "system_wrappers/include/clock.h"
17
18namespace webrtc {
19
Erik Språng96816752018-09-04 18:40:19 +020020RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
Mirko Bonadeib471c902018-07-18 14:11:27 +020021RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
Sebastian Janssonb5374962018-02-07 13:26:38 +010022RoundRobinPacketQueue::Stream::~Stream() {}
philipel881829b2017-10-13 13:27:23 +020023
Sebastian Janssonb5374962018-02-07 13:26:38 +010024RoundRobinPacketQueue::RoundRobinPacketQueue(const Clock* clock)
Erik Språng96816752018-09-04 18:40:19 +020025 : time_last_updated_(clock->TimeInMilliseconds()) {}
philipel881829b2017-10-13 13:27:23 +020026
Sebastian Janssonb5374962018-02-07 13:26:38 +010027RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
philipel881829b2017-10-13 13:27:23 +020028
Sebastian Janssonb5374962018-02-07 13:26:38 +010029void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) {
philipel881829b2017-10-13 13:27:23 +020030 Packet packet(packet_to_insert);
31
32 auto stream_info_it = streams_.find(packet.ssrc);
33 if (stream_info_it == streams_.end()) {
34 stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
35 stream_info_it->second.priority_it = stream_priorities_.end();
36 stream_info_it->second.ssrc = packet.ssrc;
37 }
38
39 Stream* streams_ = &stream_info_it->second;
40
41 if (streams_->priority_it == stream_priorities_.end()) {
42 // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
43 RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
44 streams_->priority_it = stream_priorities_.emplace(
45 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
46 } else if (packet.priority < streams_->priority_it->first.priority) {
47 // If the priority of this SSRC increased, remove the outdated StreamPrioKey
48 // and insert a new one with the new priority. Note that
49 // RtpPacketSender::Priority uses lower ordinal for higher priority.
50 stream_priorities_.erase(streams_->priority_it);
51 streams_->priority_it = stream_priorities_.emplace(
52 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
53 }
54 RTC_CHECK(streams_->priority_it != stream_priorities_.end());
55
56 packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
57
58 // In order to figure out how much time a packet has spent in the queue while
59 // not in a paused state, we subtract the total amount of time the queue has
60 // been paused so far, and when the packet is poped we subtract the total
61 // amount of time the queue has been paused at that moment. This way we
62 // subtract the total amount of time the packet has spent in the queue while
63 // in a paused state.
64 UpdateQueueTime(packet.enqueue_time_ms);
65 packet.enqueue_time_ms -= pause_time_sum_ms_;
66 streams_->packet_queue.push(packet);
67
68 size_packets_ += 1;
69 size_bytes_ += packet.bytes;
70}
71
Sebastian Janssonb5374962018-02-07 13:26:38 +010072const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() {
philipelccdfcca2017-10-23 12:42:17 +020073 RTC_CHECK(!pop_packet_ && !pop_stream_);
74
75 Stream* stream = GetHighestPriorityStream();
76 pop_stream_.emplace(stream);
77 pop_packet_.emplace(stream->packet_queue.top());
78 stream->packet_queue.pop();
79
80 return *pop_packet_;
philipel881829b2017-10-13 13:27:23 +020081}
82
Sebastian Janssonb5374962018-02-07 13:26:38 +010083void RoundRobinPacketQueue::CancelPop(const Packet& packet) {
philipelccdfcca2017-10-23 12:42:17 +020084 RTC_CHECK(pop_packet_ && pop_stream_);
85 (*pop_stream_)->packet_queue.push(*pop_packet_);
86 pop_packet_.reset();
87 pop_stream_.reset();
88}
89
Sebastian Janssonb5374962018-02-07 13:26:38 +010090void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
philipel881829b2017-10-13 13:27:23 +020091 if (!Empty()) {
philipelccdfcca2017-10-23 12:42:17 +020092 RTC_CHECK(pop_packet_ && pop_stream_);
93 Stream* stream = *pop_stream_;
94 stream_priorities_.erase(stream->priority_it);
95 const Packet& packet = *pop_packet_;
philipel881829b2017-10-13 13:27:23 +020096
97 // Calculate the total amount of time spent by this packet in the queue
98 // while in a non-paused state. Note that the |pause_time_sum_ms_| was
99 // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
100 // by subtracting it now we effectively remove the time spent in in the
101 // queue while in a paused state.
102 int64_t time_in_non_paused_state_ms =
103 time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_;
104 queue_time_sum_ms_ -= time_in_non_paused_state_ms;
105
106 RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
107 enqueue_times_.erase(packet.enqueue_time_it);
108
109 // Update |bytes| of this stream. The general idea is that the stream that
110 // has sent the least amount of bytes should have the highest priority.
111 // The problem with that is if streams send with different rates, in which
112 // case a "budget" will be built up for the stream sending at the lower
113 // rate. To avoid building a too large budget we limit |bytes| to be within
114 // kMaxLeading bytes of the stream that has sent the most amount of bytes.
philipelccdfcca2017-10-23 12:42:17 +0200115 stream->bytes =
116 std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
117 max_bytes_ = std::max(max_bytes_, stream->bytes);
philipel881829b2017-10-13 13:27:23 +0200118
119 size_bytes_ -= packet.bytes;
120 size_packets_ -= 1;
121 RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
philipel881829b2017-10-13 13:27:23 +0200122
123 // If there are packets left to be sent, schedule the stream again.
philipelccdfcca2017-10-23 12:42:17 +0200124 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
125 if (stream->packet_queue.empty()) {
126 stream->priority_it = stream_priorities_.end();
philipel881829b2017-10-13 13:27:23 +0200127 } else {
philipelccdfcca2017-10-23 12:42:17 +0200128 RtpPacketSender::Priority priority = stream->packet_queue.top().priority;
129 stream->priority_it = stream_priorities_.emplace(
130 StreamPrioKey(priority, stream->bytes), stream->ssrc);
philipel881829b2017-10-13 13:27:23 +0200131 }
philipelccdfcca2017-10-23 12:42:17 +0200132
133 pop_packet_.reset();
134 pop_stream_.reset();
philipel881829b2017-10-13 13:27:23 +0200135 }
136}
137
Sebastian Janssonb5374962018-02-07 13:26:38 +0100138bool RoundRobinPacketQueue::Empty() const {
philipel881829b2017-10-13 13:27:23 +0200139 RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
140 (stream_priorities_.empty() && size_packets_ == 0));
141 return stream_priorities_.empty();
142}
143
Sebastian Janssonb5374962018-02-07 13:26:38 +0100144size_t RoundRobinPacketQueue::SizeInPackets() const {
philipel881829b2017-10-13 13:27:23 +0200145 return size_packets_;
146}
147
Sebastian Janssonb5374962018-02-07 13:26:38 +0100148uint64_t RoundRobinPacketQueue::SizeInBytes() const {
philipel881829b2017-10-13 13:27:23 +0200149 return size_bytes_;
150}
151
Sebastian Janssonb5374962018-02-07 13:26:38 +0100152int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const {
philipel881829b2017-10-13 13:27:23 +0200153 if (Empty())
154 return 0;
155 RTC_CHECK(!enqueue_times_.empty());
156 return *enqueue_times_.begin();
157}
158
Sebastian Janssonb5374962018-02-07 13:26:38 +0100159void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
philipel881829b2017-10-13 13:27:23 +0200160 RTC_CHECK_GE(timestamp_ms, time_last_updated_);
161 if (timestamp_ms == time_last_updated_)
162 return;
163
164 int64_t delta_ms = timestamp_ms - time_last_updated_;
165
166 if (paused_) {
167 pause_time_sum_ms_ += delta_ms;
168 } else {
169 queue_time_sum_ms_ += delta_ms * size_packets_;
170 }
171
172 time_last_updated_ = timestamp_ms;
173}
174
Sebastian Janssonb5374962018-02-07 13:26:38 +0100175void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
philipel881829b2017-10-13 13:27:23 +0200176 if (paused_ == paused)
177 return;
178 UpdateQueueTime(timestamp_ms);
179 paused_ = paused;
180}
181
Sebastian Janssonb5374962018-02-07 13:26:38 +0100182int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const {
philipel881829b2017-10-13 13:27:23 +0200183 if (Empty())
184 return 0;
185 return queue_time_sum_ms_ / size_packets_;
186}
187
Sebastian Janssonb5374962018-02-07 13:26:38 +0100188RoundRobinPacketQueue::Stream*
189RoundRobinPacketQueue::GetHighestPriorityStream() {
philipel881829b2017-10-13 13:27:23 +0200190 RTC_CHECK(!stream_priorities_.empty());
191 uint32_t ssrc = stream_priorities_.begin()->second;
192
193 auto stream_info_it = streams_.find(ssrc);
194 RTC_CHECK(stream_info_it != streams_.end());
195 RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
196 RTC_CHECK(!stream_info_it->second.packet_queue.empty());
197 return &stream_info_it->second;
198}
199
Sebastian Janssonb5374962018-02-07 13:26:38 +0100200bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
philipel881829b2017-10-13 13:27:23 +0200201 for (const auto& scheduled_stream : stream_priorities_) {
202 if (scheduled_stream.second == ssrc)
203 return true;
204 }
205 return false;
206}
207
208} // namespace webrtc