blob: 46f65e0e8f96ecc50a1046dd7da8b0f9abd1f14f [file] [log] [blame]
philipel881829b2017-10-13 13:27:23 +02001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "modules/pacing/packet_queue2.h"
12
13#include <algorithm>
14
15#include "rtc_base/checks.h"
16#include "system_wrappers/include/clock.h"
17
18namespace webrtc {
19
20PacketQueue2::Stream::Stream() : bytes(0) {}
21PacketQueue2::Stream::~Stream() {}
22
23PacketQueue2::Packet::Packet(RtpPacketSender::Priority priority,
24 uint32_t ssrc,
25 uint16_t seq_number,
26 int64_t capture_time_ms,
27 int64_t enqueue_time_ms,
28 size_t length_in_bytes,
29 bool retransmission,
30 uint64_t enqueue_order)
31 : priority(priority),
32 ssrc(ssrc),
33 sequence_number(seq_number),
34 capture_time_ms(capture_time_ms),
35 enqueue_time_ms(enqueue_time_ms),
36 bytes(length_in_bytes),
37 retransmission(retransmission),
38 enqueue_order(enqueue_order) {}
39
40PacketQueue2::Packet::Packet(const Packet& other) = default;
41
42PacketQueue2::Packet::~Packet() {}
43
44PacketQueue2::PacketQueue2(const Clock* clock)
45 : clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {}
46
47PacketQueue2::~PacketQueue2() {}
48
49void PacketQueue2::Push(const Packet& packet_to_insert) {
50 Packet packet(packet_to_insert);
51
52 auto stream_info_it = streams_.find(packet.ssrc);
53 if (stream_info_it == streams_.end()) {
54 stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
55 stream_info_it->second.priority_it = stream_priorities_.end();
56 stream_info_it->second.ssrc = packet.ssrc;
57 }
58
59 Stream* streams_ = &stream_info_it->second;
60
61 if (streams_->priority_it == stream_priorities_.end()) {
62 // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
63 RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
64 streams_->priority_it = stream_priorities_.emplace(
65 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
66 } else if (packet.priority < streams_->priority_it->first.priority) {
67 // If the priority of this SSRC increased, remove the outdated StreamPrioKey
68 // and insert a new one with the new priority. Note that
69 // RtpPacketSender::Priority uses lower ordinal for higher priority.
70 stream_priorities_.erase(streams_->priority_it);
71 streams_->priority_it = stream_priorities_.emplace(
72 StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc);
73 }
74 RTC_CHECK(streams_->priority_it != stream_priorities_.end());
75
76 packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
77
78 // In order to figure out how much time a packet has spent in the queue while
79 // not in a paused state, we subtract the total amount of time the queue has
80 // been paused so far, and when the packet is poped we subtract the total
81 // amount of time the queue has been paused at that moment. This way we
82 // subtract the total amount of time the packet has spent in the queue while
83 // in a paused state.
84 UpdateQueueTime(packet.enqueue_time_ms);
85 packet.enqueue_time_ms -= pause_time_sum_ms_;
86 streams_->packet_queue.push(packet);
87
88 size_packets_ += 1;
89 size_bytes_ += packet.bytes;
90}
91
92const PacketQueue2::Packet& PacketQueue2::Top() {
93 return GetHighestPriorityStream()->packet_queue.top();
94}
95
96void PacketQueue2::Pop() {
97 RTC_CHECK(!paused_);
98 if (!Empty()) {
99 Stream* streams_ = GetHighestPriorityStream();
100 stream_priorities_.erase(streams_->priority_it);
101 const Packet& packet = streams_->packet_queue.top();
102
103 // Calculate the total amount of time spent by this packet in the queue
104 // while in a non-paused state. Note that the |pause_time_sum_ms_| was
105 // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
106 // by subtracting it now we effectively remove the time spent in in the
107 // queue while in a paused state.
108 int64_t time_in_non_paused_state_ms =
109 time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_;
110 queue_time_sum_ms_ -= time_in_non_paused_state_ms;
111
112 RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
113 enqueue_times_.erase(packet.enqueue_time_it);
114
115 // Update |bytes| of this stream. The general idea is that the stream that
116 // has sent the least amount of bytes should have the highest priority.
117 // The problem with that is if streams send with different rates, in which
118 // case a "budget" will be built up for the stream sending at the lower
119 // rate. To avoid building a too large budget we limit |bytes| to be within
120 // kMaxLeading bytes of the stream that has sent the most amount of bytes.
121 streams_->bytes =
122 std::max(streams_->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
123 max_bytes_ = std::max(max_bytes_, streams_->bytes);
124
125 size_bytes_ -= packet.bytes;
126 size_packets_ -= 1;
127 RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
128 streams_->packet_queue.pop();
129
130 // If there are packets left to be sent, schedule the stream again.
131 RTC_CHECK(!IsSsrcScheduled(streams_->ssrc));
132 if (streams_->packet_queue.empty()) {
133 streams_->priority_it = stream_priorities_.end();
134 } else {
135 RtpPacketSender::Priority priority =
136 streams_->packet_queue.top().priority;
137 streams_->priority_it = stream_priorities_.emplace(
138 StreamPrioKey(priority, streams_->bytes), streams_->ssrc);
139 }
140 }
141}
142
143bool PacketQueue2::Empty() const {
144 RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
145 (stream_priorities_.empty() && size_packets_ == 0));
146 return stream_priorities_.empty();
147}
148
149size_t PacketQueue2::SizeInPackets() const {
150 return size_packets_;
151}
152
153uint64_t PacketQueue2::SizeInBytes() const {
154 return size_bytes_;
155}
156
157int64_t PacketQueue2::OldestEnqueueTimeMs() const {
158 if (Empty())
159 return 0;
160 RTC_CHECK(!enqueue_times_.empty());
161 return *enqueue_times_.begin();
162}
163
164void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) {
165 RTC_CHECK_GE(timestamp_ms, time_last_updated_);
166 if (timestamp_ms == time_last_updated_)
167 return;
168
169 int64_t delta_ms = timestamp_ms - time_last_updated_;
170
171 if (paused_) {
172 pause_time_sum_ms_ += delta_ms;
173 } else {
174 queue_time_sum_ms_ += delta_ms * size_packets_;
175 }
176
177 time_last_updated_ = timestamp_ms;
178}
179
180void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) {
181 if (paused_ == paused)
182 return;
183 UpdateQueueTime(timestamp_ms);
184 paused_ = paused;
185}
186
187int64_t PacketQueue2::AverageQueueTimeMs() const {
188 if (Empty())
189 return 0;
190 return queue_time_sum_ms_ / size_packets_;
191}
192
193PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() {
194 RTC_CHECK(!stream_priorities_.empty());
195 uint32_t ssrc = stream_priorities_.begin()->second;
196
197 auto stream_info_it = streams_.find(ssrc);
198 RTC_CHECK(stream_info_it != streams_.end());
199 RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
200 RTC_CHECK(!stream_info_it->second.packet_queue.empty());
201 return &stream_info_it->second;
202}
203
204bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const {
205 for (const auto& scheduled_stream : stream_priorities_) {
206 if (scheduled_stream.second == ssrc)
207 return true;
208 }
209 return false;
210}
211
212} // namespace webrtc