philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. |
| 3 | * |
| 4 | * Use of this source code is governed by a BSD-style license |
| 5 | * that can be found in the LICENSE file in the root of the source |
| 6 | * tree. An additional intellectual property rights grant can be found |
| 7 | * in the file PATENTS. All contributing project authors may |
| 8 | * be found in the AUTHORS file in the root of the source tree. |
| 9 | */ |
| 10 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 11 | #include "modules/pacing/round_robin_packet_queue.h" |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 12 | |
| 13 | #include <algorithm> |
| 14 | |
| 15 | #include "rtc_base/checks.h" |
| 16 | #include "system_wrappers/include/clock.h" |
| 17 | |
| 18 | namespace webrtc { |
| 19 | |
Erik Språng | 9681675 | 2018-09-04 18:40:19 +0200 | [diff] [blame] | 20 | RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} |
Mirko Bonadei | b471c90 | 2018-07-18 14:11:27 +0200 | [diff] [blame] | 21 | RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 22 | RoundRobinPacketQueue::Stream::~Stream() {} |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 23 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 24 | RoundRobinPacketQueue::RoundRobinPacketQueue(const Clock* clock) |
Erik Språng | 9681675 | 2018-09-04 18:40:19 +0200 | [diff] [blame] | 25 | : time_last_updated_(clock->TimeInMilliseconds()) {} |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 26 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 27 | RoundRobinPacketQueue::~RoundRobinPacketQueue() {} |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 28 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 29 | void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 30 | Packet packet(packet_to_insert); |
| 31 | |
| 32 | auto stream_info_it = streams_.find(packet.ssrc); |
| 33 | if (stream_info_it == streams_.end()) { |
| 34 | stream_info_it = streams_.emplace(packet.ssrc, Stream()).first; |
| 35 | stream_info_it->second.priority_it = stream_priorities_.end(); |
| 36 | stream_info_it->second.ssrc = packet.ssrc; |
| 37 | } |
| 38 | |
| 39 | Stream* streams_ = &stream_info_it->second; |
| 40 | |
| 41 | if (streams_->priority_it == stream_priorities_.end()) { |
| 42 | // If the SSRC is not currently scheduled, add it to |stream_priorities_|. |
| 43 | RTC_CHECK(!IsSsrcScheduled(streams_->ssrc)); |
| 44 | streams_->priority_it = stream_priorities_.emplace( |
| 45 | StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); |
| 46 | } else if (packet.priority < streams_->priority_it->first.priority) { |
| 47 | // If the priority of this SSRC increased, remove the outdated StreamPrioKey |
| 48 | // and insert a new one with the new priority. Note that |
| 49 | // RtpPacketSender::Priority uses lower ordinal for higher priority. |
| 50 | stream_priorities_.erase(streams_->priority_it); |
| 51 | streams_->priority_it = stream_priorities_.emplace( |
| 52 | StreamPrioKey(packet.priority, streams_->bytes), packet.ssrc); |
| 53 | } |
| 54 | RTC_CHECK(streams_->priority_it != stream_priorities_.end()); |
| 55 | |
| 56 | packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms); |
| 57 | |
| 58 | // In order to figure out how much time a packet has spent in the queue while |
| 59 | // not in a paused state, we subtract the total amount of time the queue has |
| 60 | // been paused so far, and when the packet is poped we subtract the total |
| 61 | // amount of time the queue has been paused at that moment. This way we |
| 62 | // subtract the total amount of time the packet has spent in the queue while |
| 63 | // in a paused state. |
| 64 | UpdateQueueTime(packet.enqueue_time_ms); |
| 65 | packet.enqueue_time_ms -= pause_time_sum_ms_; |
| 66 | streams_->packet_queue.push(packet); |
| 67 | |
| 68 | size_packets_ += 1; |
| 69 | size_bytes_ += packet.bytes; |
| 70 | } |
| 71 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 72 | const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() { |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 73 | RTC_CHECK(!pop_packet_ && !pop_stream_); |
| 74 | |
| 75 | Stream* stream = GetHighestPriorityStream(); |
| 76 | pop_stream_.emplace(stream); |
| 77 | pop_packet_.emplace(stream->packet_queue.top()); |
| 78 | stream->packet_queue.pop(); |
| 79 | |
| 80 | return *pop_packet_; |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 81 | } |
| 82 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 83 | void RoundRobinPacketQueue::CancelPop(const Packet& packet) { |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 84 | RTC_CHECK(pop_packet_ && pop_stream_); |
| 85 | (*pop_stream_)->packet_queue.push(*pop_packet_); |
| 86 | pop_packet_.reset(); |
| 87 | pop_stream_.reset(); |
| 88 | } |
| 89 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 90 | void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 91 | if (!Empty()) { |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 92 | RTC_CHECK(pop_packet_ && pop_stream_); |
| 93 | Stream* stream = *pop_stream_; |
| 94 | stream_priorities_.erase(stream->priority_it); |
| 95 | const Packet& packet = *pop_packet_; |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 96 | |
| 97 | // Calculate the total amount of time spent by this packet in the queue |
| 98 | // while in a non-paused state. Note that the |pause_time_sum_ms_| was |
| 99 | // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and |
| 100 | // by subtracting it now we effectively remove the time spent in in the |
| 101 | // queue while in a paused state. |
| 102 | int64_t time_in_non_paused_state_ms = |
| 103 | time_last_updated_ - packet.enqueue_time_ms - pause_time_sum_ms_; |
| 104 | queue_time_sum_ms_ -= time_in_non_paused_state_ms; |
| 105 | |
| 106 | RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end()); |
| 107 | enqueue_times_.erase(packet.enqueue_time_it); |
| 108 | |
| 109 | // Update |bytes| of this stream. The general idea is that the stream that |
| 110 | // has sent the least amount of bytes should have the highest priority. |
| 111 | // The problem with that is if streams send with different rates, in which |
| 112 | // case a "budget" will be built up for the stream sending at the lower |
| 113 | // rate. To avoid building a too large budget we limit |bytes| to be within |
| 114 | // kMaxLeading bytes of the stream that has sent the most amount of bytes. |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 115 | stream->bytes = |
| 116 | std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes); |
| 117 | max_bytes_ = std::max(max_bytes_, stream->bytes); |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 118 | |
| 119 | size_bytes_ -= packet.bytes; |
| 120 | size_packets_ -= 1; |
| 121 | RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 122 | |
| 123 | // If there are packets left to be sent, schedule the stream again. |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 124 | RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); |
| 125 | if (stream->packet_queue.empty()) { |
| 126 | stream->priority_it = stream_priorities_.end(); |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 127 | } else { |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 128 | RtpPacketSender::Priority priority = stream->packet_queue.top().priority; |
| 129 | stream->priority_it = stream_priorities_.emplace( |
| 130 | StreamPrioKey(priority, stream->bytes), stream->ssrc); |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 131 | } |
philipel | ccdfcca | 2017-10-23 12:42:17 +0200 | [diff] [blame] | 132 | |
| 133 | pop_packet_.reset(); |
| 134 | pop_stream_.reset(); |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 135 | } |
| 136 | } |
| 137 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 138 | bool RoundRobinPacketQueue::Empty() const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 139 | RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || |
| 140 | (stream_priorities_.empty() && size_packets_ == 0)); |
| 141 | return stream_priorities_.empty(); |
| 142 | } |
| 143 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 144 | size_t RoundRobinPacketQueue::SizeInPackets() const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 145 | return size_packets_; |
| 146 | } |
| 147 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 148 | uint64_t RoundRobinPacketQueue::SizeInBytes() const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 149 | return size_bytes_; |
| 150 | } |
| 151 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 152 | int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 153 | if (Empty()) |
| 154 | return 0; |
| 155 | RTC_CHECK(!enqueue_times_.empty()); |
| 156 | return *enqueue_times_.begin(); |
| 157 | } |
| 158 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 159 | void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 160 | RTC_CHECK_GE(timestamp_ms, time_last_updated_); |
| 161 | if (timestamp_ms == time_last_updated_) |
| 162 | return; |
| 163 | |
| 164 | int64_t delta_ms = timestamp_ms - time_last_updated_; |
| 165 | |
| 166 | if (paused_) { |
| 167 | pause_time_sum_ms_ += delta_ms; |
| 168 | } else { |
| 169 | queue_time_sum_ms_ += delta_ms * size_packets_; |
| 170 | } |
| 171 | |
| 172 | time_last_updated_ = timestamp_ms; |
| 173 | } |
| 174 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 175 | void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 176 | if (paused_ == paused) |
| 177 | return; |
| 178 | UpdateQueueTime(timestamp_ms); |
| 179 | paused_ = paused; |
| 180 | } |
| 181 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 182 | int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 183 | if (Empty()) |
| 184 | return 0; |
| 185 | return queue_time_sum_ms_ / size_packets_; |
| 186 | } |
| 187 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 188 | RoundRobinPacketQueue::Stream* |
| 189 | RoundRobinPacketQueue::GetHighestPriorityStream() { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 190 | RTC_CHECK(!stream_priorities_.empty()); |
| 191 | uint32_t ssrc = stream_priorities_.begin()->second; |
| 192 | |
| 193 | auto stream_info_it = streams_.find(ssrc); |
| 194 | RTC_CHECK(stream_info_it != streams_.end()); |
| 195 | RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin()); |
| 196 | RTC_CHECK(!stream_info_it->second.packet_queue.empty()); |
| 197 | return &stream_info_it->second; |
| 198 | } |
| 199 | |
Sebastian Jansson | b537496 | 2018-02-07 13:26:38 +0100 | [diff] [blame] | 200 | bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const { |
philipel | 881829b | 2017-10-13 13:27:23 +0200 | [diff] [blame] | 201 | for (const auto& scheduled_stream : stream_priorities_) { |
| 202 | if (scheduled_stream.second == ssrc) |
| 203 | return true; |
| 204 | } |
| 205 | return false; |
| 206 | } |
| 207 | |
| 208 | } // namespace webrtc |