blob: 6aee807af3691c11636010d2dea2530c8a091f85 [file] [log] [blame]
philipel881829b2017-10-13 13:27:23 +02001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "modules/pacing/packet_queue2.h"
12
13#include <algorithm>
14
15#include "rtc_base/checks.h"
16#include "system_wrappers/include/clock.h"
17
18namespace webrtc {
19
20PacketQueue2::Stream::Stream() : bytes(0) {}
21PacketQueue2::Stream::~Stream() {}
22
philipel881829b2017-10-13 13:27:23 +020023PacketQueue2::PacketQueue2(const Clock* clock)
philipelccdfcca2017-10-23 12:42:17 +020024 : PacketQueue(clock),
25 clock_(clock),
26 time_last_updated_(clock_->TimeInMilliseconds()) {}
philipel881829b2017-10-13 13:27:23 +020027
28PacketQueue2::~PacketQueue2() {}
29
30void PacketQueue2::Push(const Packet& packet_to_insert) {
31 Packet packet(packet_to_insert);
32
33 auto stream_info_it = streams_.find(packet.ssrc);
34 if (stream_info_it == streams_.end()) {
35 stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
36 stream_info_it->second.priority_it = stream_priorities_.end();
37 stream_info_it->second.ssrc = packet.ssrc;
38 }
39
40 Stream* streams_ = &stream_info_it->second;
41
42 if (streams_->priority_it == stream_priorities_.end()) {
43 // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
44 RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
45 streams_->priority_it = stream_priorities_.emplace(
46 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
47 } else if (packet.priority < streams_->priority_it->first.priority) {
48 // If the priority of this SSRC increased, remove the outdated StreamPrioKey
49 // and insert a new one with the new priority. Note that
50 // RtpPacketSender::Priority uses lower ordinal for higher priority.
51 stream_priorities_.erase(streams_->priority_it);
52 streams_->priority_it = stream_priorities_.emplace(
53 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
54 }
55 RTC_CHECK(streams_->priority_it != stream_priorities_.end());
56
57 packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
58
59 // In order to figure out how much time a packet has spent in the queue while
60 // not in a paused state, we subtract the total amount of time the queue has
61 // been paused so far, and when the packet is poped we subtract the total
62 // amount of time the queue has been paused at that moment. This way we
63 // subtract the total amount of time the packet has spent in the queue while
64 // in a paused state.
65 UpdateQueueTime(packet.enqueue_time_ms);
66 packet.enqueue_time_ms -= pause_time_sum_ms_;
67 streams_->packet_queue.push(packet);
68
69 size_packets_ += 1;
70 size_bytes_ += packet.bytes;
71}
72
philipelccdfcca2017-10-23 12:42:17 +020073const PacketQueue2::Packet& PacketQueue2::BeginPop() {
74 RTC_CHECK(!pop_packet_ && !pop_stream_);
75
76 Stream* stream = GetHighestPriorityStream();
77 pop_stream_.emplace(stream);
78 pop_packet_.emplace(stream->packet_queue.top());
79 stream->packet_queue.pop();
80
81 return *pop_packet_;
philipel881829b2017-10-13 13:27:23 +020082}
83
philipelccdfcca2017-10-23 12:42:17 +020084void PacketQueue2::CancelPop(const Packet& packet) {
85 RTC_CHECK(pop_packet_ && pop_stream_);
86 (*pop_stream_)->packet_queue.push(*pop_packet_);
87 pop_packet_.reset();
88 pop_stream_.reset();
89}
90
91void PacketQueue2::FinalizePop(const Packet& packet) {
philipel881829b2017-10-13 13:27:23 +020092 RTC_CHECK(!paused_);
93 if (!Empty()) {
philipelccdfcca2017-10-23 12:42:17 +020094 RTC_CHECK(pop_packet_ && pop_stream_);
95 Stream* stream = *pop_stream_;
96 stream_priorities_.erase(stream->priority_it);
97 const Packet& packet = *pop_packet_;
philipel881829b2017-10-13 13:27:23 +020098
99 // Calculate the total amount of time spent by this packet in the queue
100 // while in a non-paused state. Note that the |pause_time_sum_ms_| was
101 // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
102 // by subtracting it now we effectively remove the time spent in in the
103 // queue while in a paused state.
104 int64_t time_in_non_paused_state_ms =
105 time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_;
106 queue_time_sum_ms_ -= time_in_non_paused_state_ms;
107
108 RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
109 enqueue_times_.erase(packet.enqueue_time_it);
110
111 // Update |bytes| of this stream. The general idea is that the stream that
112 // has sent the least amount of bytes should have the highest priority.
113 // The problem with that is if streams send with different rates, in which
114 // case a "budget" will be built up for the stream sending at the lower
115 // rate. To avoid building a too large budget we limit |bytes| to be within
116 // kMaxLeading bytes of the stream that has sent the most amount of bytes.
philipelccdfcca2017-10-23 12:42:17 +0200117 stream->bytes =
118 std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
119 max_bytes_ = std::max(max_bytes_, stream->bytes);
philipel881829b2017-10-13 13:27:23 +0200120
121 size_bytes_ -= packet.bytes;
122 size_packets_ -= 1;
123 RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
philipel881829b2017-10-13 13:27:23 +0200124
125 // If there are packets left to be sent, schedule the stream again.
philipelccdfcca2017-10-23 12:42:17 +0200126 RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
127 if (stream->packet_queue.empty()) {
128 stream->priority_it = stream_priorities_.end();
philipel881829b2017-10-13 13:27:23 +0200129 } else {
philipelccdfcca2017-10-23 12:42:17 +0200130 RtpPacketSender::Priority priority = stream->packet_queue.top().priority;
131 stream->priority_it = stream_priorities_.emplace(
132 StreamPrioKey(priority, stream->bytes), stream->ssrc);
philipel881829b2017-10-13 13:27:23 +0200133 }
philipelccdfcca2017-10-23 12:42:17 +0200134
135 pop_packet_.reset();
136 pop_stream_.reset();
philipel881829b2017-10-13 13:27:23 +0200137 }
138}
139
140bool PacketQueue2::Empty() const {
141 RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
142 (stream_priorities_.empty() && size_packets_ == 0));
143 return stream_priorities_.empty();
144}
145
146size_t PacketQueue2::SizeInPackets() const {
147 return size_packets_;
148}
149
150uint64_t PacketQueue2::SizeInBytes() const {
151 return size_bytes_;
152}
153
154int64_t PacketQueue2::OldestEnqueueTimeMs() const {
155 if (Empty())
156 return 0;
157 RTC_CHECK(!enqueue_times_.empty());
158 return *enqueue_times_.begin();
159}
160
161void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) {
162 RTC_CHECK_GE(timestamp_ms, time_last_updated_);
163 if (timestamp_ms == time_last_updated_)
164 return;
165
166 int64_t delta_ms = timestamp_ms - time_last_updated_;
167
168 if (paused_) {
169 pause_time_sum_ms_ += delta_ms;
170 } else {
171 queue_time_sum_ms_ += delta_ms * size_packets_;
172 }
173
174 time_last_updated_ = timestamp_ms;
175}
176
177void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) {
178 if (paused_ == paused)
179 return;
180 UpdateQueueTime(timestamp_ms);
181 paused_ = paused;
182}
183
184int64_t PacketQueue2::AverageQueueTimeMs() const {
185 if (Empty())
186 return 0;
187 return queue_time_sum_ms_ / size_packets_;
188}
189
190PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() {
191 RTC_CHECK(!stream_priorities_.empty());
192 uint32_t ssrc = stream_priorities_.begin()->second;
193
194 auto stream_info_it = streams_.find(ssrc);
195 RTC_CHECK(stream_info_it != streams_.end());
196 RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
197 RTC_CHECK(!stream_info_it->second.packet_queue.empty());
198 return &stream_info_it->second;
199}
200
201bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const {
202 for (const auto& scheduled_stream : stream_priorities_) {
203 if (scheduled_stream.second == ssrc)
204 return true;
205 }
206 return false;
207}
208
209} // namespace webrtc