Cleans up the round robin packet queue.
Usage of this class has now been simplified so that we can do some
cleanup:
* Removes dead code: Push() with 9 args, CancelPop()
* Replaces BeginPop()/CancelPop() with a single Pop() method
* Makes QueuePacket a private class
* Replaces rtp_packets_ with direct ownership from QueuePacket
Bug: webrtc:10809
Change-Id: Iea131ee87d5d920360c71fb180b2af0ea4fc6c7f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/160007
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29869}
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 18a4f88..985fb5c 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -602,13 +602,7 @@
}
}
- auto* queued_packet = packet_queue_.BeginPop();
- std::unique_ptr<RtpPacketToSend> rtp_packet;
- if (queued_packet != nullptr) {
- rtp_packet = queued_packet->ReleasePacket();
- packet_queue_.FinalizePop();
- }
- return rtp_packet;
+ return packet_queue_.Pop();
}
void PacingController::OnPacketSent(RtpPacketToSend::Type packet_type,
diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc
index 02e9cd7..16542b3 100644
--- a/modules/pacing/round_robin_packet_queue.cc
+++ b/modules/pacing/round_robin_packet_queue.cc
@@ -27,32 +27,65 @@
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
int priority,
- RtpPacketToSend::Type type,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
Timestamp enqueue_time,
- DataSize size,
- bool retransmission,
uint64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
- absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
- packet_it)
- : type_(type),
- priority_(priority),
- ssrc_(ssrc),
- sequence_number_(seq_number),
- capture_time_ms_(capture_time_ms),
+ std::unique_ptr<RtpPacketToSend> packet)
+ : priority_(priority),
enqueue_time_(enqueue_time),
- size_(size),
- retransmission_(retransmission),
enqueue_order_(enqueue_order),
+ is_retransmission_(packet->packet_type() ==
+ RtpPacketToSend::Type::kRetransmission),
enqueue_time_it_(enqueue_time_it),
- packet_it_(packet_it) {}
+ owned_packet_(packet.release()) {}
-std::unique_ptr<RtpPacketToSend>
-RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
- return packet_it_ ? std::move(**packet_it_) : nullptr;
+bool RoundRobinPacketQueue::QueuedPacket::operator<(
+ const RoundRobinPacketQueue::QueuedPacket& other) const {
+ if (priority_ != other.priority_)
+ return priority_ > other.priority_;
+ if (is_retransmission_ != other.is_retransmission_)
+ return other.is_retransmission_;
+
+ return enqueue_order_ > other.enqueue_order_;
+}
+
+int RoundRobinPacketQueue::QueuedPacket::Priority() const {
+ return priority_;
+}
+
+RtpPacketToSend::Type RoundRobinPacketQueue::QueuedPacket::Type() const {
+ return *owned_packet_->packet_type();
+}
+
+uint32_t RoundRobinPacketQueue::QueuedPacket::Ssrc() const {
+ return owned_packet_->Ssrc();
+}
+
+Timestamp RoundRobinPacketQueue::QueuedPacket::EnqueueTime() const {
+ return enqueue_time_;
+}
+
+bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const {
+ return Type() == RtpPacketToSend::Type::kRetransmission;
+}
+
+uint64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
+ return enqueue_order_;
+}
+
+DataSize RoundRobinPacketQueue::QueuedPacket::Size(bool count_overhead) const {
+ return DataSize::bytes(count_overhead ? owned_packet_->size()
+ : owned_packet_->payload_size() +
+ owned_packet_->padding_size());
+}
+
+RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const {
+ return owned_packet_;
+}
+
+std::multiset<Timestamp>::iterator
+RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const {
+ return enqueue_time_it_;
}
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
@@ -60,19 +93,9 @@
enqueue_time_ -= pause_time_sum;
}
-bool RoundRobinPacketQueue::QueuedPacket::operator<(
- const RoundRobinPacketQueue::QueuedPacket& other) const {
- if (priority_ != other.priority_)
- return priority_ > other.priority_;
- if (retransmission_ != other.retransmission_)
- return other.retransmission_;
-
- return enqueue_order_ > other.enqueue_order_;
-}
-
RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
-RoundRobinPacketQueue::Stream::~Stream() {}
+RoundRobinPacketQueue::Stream::~Stream() = default;
bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
if (!field_trials) {
@@ -94,112 +117,70 @@
send_side_bwe_with_overhead_(
IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
-RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
-
-void RoundRobinPacketQueue::Push(int priority,
- RtpPacketToSend::Type type,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- Timestamp enqueue_time,
- DataSize size,
- bool retransmission,
- uint64_t enqueue_order) {
- Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
- enqueue_time, size, retransmission, enqueue_order,
- enqueue_times_.insert(enqueue_time), absl::nullopt));
+RoundRobinPacketQueue::~RoundRobinPacketQueue() {
+ // Make sure to release any packets owned by raw pointer in QueuedPacket.
+ while (!Empty()) {
+ Pop();
+ }
}
void RoundRobinPacketQueue::Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
- uint32_t ssrc = packet->Ssrc();
- uint16_t sequence_number = packet->SequenceNumber();
- int64_t capture_time_ms = packet->capture_time_ms();
- DataSize size =
- DataSize::bytes(send_side_bwe_with_overhead_
- ? packet->size()
- : packet->payload_size() + packet->padding_size());
- auto type = packet->packet_type();
- RTC_DCHECK(type.has_value());
-
- rtp_packets_.push_front(std::move(packet));
- Push(QueuedPacket(
- priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
- size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
- enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
+ RTC_DCHECK(packet->packet_type().has_value());
+ Push(QueuedPacket(priority, enqueue_time, enqueue_order,
+ enqueue_times_.insert(enqueue_time), std::move(packet)));
}
-RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
- RTC_CHECK(!pop_packet_ && !pop_stream_);
-
+std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
+ RTC_DCHECK(!Empty());
Stream* stream = GetHighestPriorityStream();
- pop_stream_.emplace(stream);
- pop_packet_.emplace(stream->packet_queue.top());
+ const QueuedPacket& queued_packet = stream->packet_queue.top();
+
+ stream_priorities_.erase(stream->priority_it);
+
+ // Calculate the total amount of time spent by this packet in the queue
+ // while in a non-paused state. Note that the |pause_time_sum_ms_| was
+ // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
+ // by subtracting it now we effectively remove the time spent in in the
+ // queue while in a paused state.
+ TimeDelta time_in_non_paused_state =
+ time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
+ queue_time_sum_ -= time_in_non_paused_state;
+
+ RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
+ enqueue_times_.erase(queued_packet.EnqueueTimeIterator());
+
+ // Update |bytes| of this stream. The general idea is that the stream that
+ // has sent the least amount of bytes should have the highest priority.
+ // The problem with that is if streams send with different rates, in which
+ // case a "budget" will be built up for the stream sending at the lower
+ // rate. To avoid building a too large budget we limit |bytes| to be within
+ // kMaxLeading bytes of the stream that has sent the most amount of bytes.
+ DataSize packet_size = queued_packet.Size(send_side_bwe_with_overhead_);
+ stream->size =
+ std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
+ max_size_ = std::max(max_size_, stream->size);
+
+ size_ -= packet_size;
+ size_packets_ -= 1;
+ RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
+
+ std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
stream->packet_queue.pop();
- return &pop_packet_.value();
-}
-
-void RoundRobinPacketQueue::CancelPop() {
- RTC_CHECK(pop_packet_ && pop_stream_);
- (*pop_stream_)->packet_queue.push(*pop_packet_);
- pop_packet_.reset();
- pop_stream_.reset();
-}
-
-void RoundRobinPacketQueue::FinalizePop() {
- if (!Empty()) {
- RTC_CHECK(pop_packet_ && pop_stream_);
- Stream* stream = *pop_stream_;
- stream_priorities_.erase(stream->priority_it);
- const QueuedPacket& packet = *pop_packet_;
-
- // Calculate the total amount of time spent by this packet in the queue
- // while in a non-paused state. Note that the |pause_time_sum_ms_| was
- // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
- // by subtracting it now we effectively remove the time spent in in the
- // queue while in a paused state.
- TimeDelta time_in_non_paused_state =
- time_last_updated_ - packet.enqueue_time() - pause_time_sum_;
- queue_time_sum_ -= time_in_non_paused_state;
-
- RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
- enqueue_times_.erase(packet.EnqueueTimeIterator());
-
- auto packet_it = packet.PacketIterator();
- if (packet_it) {
- rtp_packets_.erase(*packet_it);
- }
-
- // Update |bytes| of this stream. The general idea is that the stream that
- // has sent the least amount of bytes should have the highest priority.
- // The problem with that is if streams send with different rates, in which
- // case a "budget" will be built up for the stream sending at the lower
- // rate. To avoid building a too large budget we limit |bytes| to be within
- // kMaxLeading bytes of the stream that has sent the most amount of bytes.
- stream->size =
- std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize);
- max_size_ = std::max(max_size_, stream->size);
-
- size_ -= packet.size();
- size_packets_ -= 1;
- RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
-
- // If there are packets left to be sent, schedule the stream again.
- RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
- if (stream->packet_queue.empty()) {
- stream->priority_it = stream_priorities_.end();
- } else {
- int priority = stream->packet_queue.top().priority();
- stream->priority_it = stream_priorities_.emplace(
- StreamPrioKey(priority, stream->size), stream->ssrc);
- }
-
- pop_packet_.reset();
- pop_stream_.reset();
+ // If there are packets left to be sent, schedule the stream again.
+ RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
+ if (stream->packet_queue.empty()) {
+ stream->priority_it = stream_priorities_.end();
+ } else {
+ int priority = stream->packet_queue.top().Priority();
+ stream->priority_it = stream_priorities_.emplace(
+ StreamPrioKey(priority, stream->size), stream->ssrc);
}
+
+ return rtp_packet;
}
bool RoundRobinPacketQueue::Empty() const {
@@ -223,7 +204,7 @@
uint32_t ssrc = stream_priorities_.begin()->second;
auto stream_info_it = streams_.find(ssrc);
- return stream_info_it->second.packet_queue.top().type() ==
+ return stream_info_it->second.packet_queue.top().Type() ==
RtpPacketToSend::Type::kAudio;
}
@@ -264,11 +245,11 @@
}
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
- auto stream_info_it = streams_.find(packet.ssrc());
+ auto stream_info_it = streams_.find(packet.Ssrc());
if (stream_info_it == streams_.end()) {
- stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
+ stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
- stream_info_it->second.ssrc = packet.ssrc();
+ stream_info_it->second.ssrc = packet.Ssrc();
}
Stream* stream = &stream_info_it->second;
@@ -277,14 +258,14 @@
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
- StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
- } else if (packet.priority() < stream->priority_it->first.priority) {
+ StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
+ } else if (packet.Priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
- StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
+ StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
@@ -294,11 +275,11 @@
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
- UpdateQueueTime(packet.enqueue_time());
+ UpdateQueueTime(packet.EnqueueTime());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
- size_ += packet.size();
+ size_ += packet.Size(send_side_bwe_with_overhead_);
stream->packet_queue.push(packet);
}
diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h
index dcd25ad..96b458f 100644
--- a/modules/pacing/round_robin_packet_queue.h
+++ b/modules/pacing/round_robin_packet_queue.h
@@ -37,80 +37,11 @@
const WebRtcKeyValueConfig* field_trials);
~RoundRobinPacketQueue();
- struct QueuedPacket {
- public:
- QueuedPacket(
- int priority,
- RtpPacketToSend::Type type,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- Timestamp enqueue_time,
- DataSize size,
- bool retransmission,
- uint64_t enqueue_order,
- std::multiset<Timestamp>::iterator enqueue_time_it,
- absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
- packet_it);
- QueuedPacket(const QueuedPacket& rhs);
- ~QueuedPacket();
-
- bool operator<(const QueuedPacket& other) const;
-
- int priority() const { return priority_; }
- RtpPacketToSend::Type type() const { return type_; }
- uint32_t ssrc() const { return ssrc_; }
- uint16_t sequence_number() const { return sequence_number_; }
- int64_t capture_time_ms() const { return capture_time_ms_; }
- Timestamp enqueue_time() const { return enqueue_time_; }
- DataSize size() const { return size_; }
- bool is_retransmission() const { return retransmission_; }
- uint64_t enqueue_order() const { return enqueue_order_; }
- std::unique_ptr<RtpPacketToSend> ReleasePacket();
-
- // For internal use.
- absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
- PacketIterator() const {
- return packet_it_;
- }
- std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
- return enqueue_time_it_;
- }
- void SubtractPauseTime(TimeDelta pause_time_sum);
-
- private:
- RtpPacketToSend::Type type_;
- int priority_;
- uint32_t ssrc_;
- uint16_t sequence_number_;
- int64_t capture_time_ms_; // Absolute time of frame capture.
- Timestamp enqueue_time_; // Absolute time of pacer queue entry.
- DataSize size_;
- bool retransmission_;
- uint64_t enqueue_order_;
- std::multiset<Timestamp>::iterator enqueue_time_it_;
- // Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
- // if applicable.
- absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
- packet_it_;
- };
-
- void Push(int priority,
- RtpPacketToSend::Type type,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- Timestamp enqueue_time,
- DataSize size,
- bool retransmission,
- uint64_t enqueue_order);
void Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet);
- QueuedPacket* BeginPop();
- void CancelPop();
- void FinalizePop();
+ std::unique_ptr<RtpPacketToSend> Pop();
bool Empty() const;
size_t SizeInPackets() const;
@@ -123,6 +54,41 @@
void SetPauseState(bool paused, Timestamp now);
private:
+ struct QueuedPacket {
+ public:
+ QueuedPacket(int priority,
+ Timestamp enqueue_time,
+ uint64_t enqueue_order,
+ std::multiset<Timestamp>::iterator enqueue_time_it,
+ std::unique_ptr<RtpPacketToSend> packet);
+ QueuedPacket(const QueuedPacket& rhs);
+ ~QueuedPacket();
+
+ bool operator<(const QueuedPacket& other) const;
+
+ int Priority() const;
+ RtpPacketToSend::Type Type() const;
+ uint32_t Ssrc() const;
+ Timestamp EnqueueTime() const;
+ bool IsRetransmission() const;
+ uint64_t EnqueueOrder() const;
+ DataSize Size(bool count_overhead) const;
+ RtpPacketToSend* RtpPacket() const;
+
+ std::multiset<Timestamp>::iterator EnqueueTimeIterator() const;
+ void SubtractPauseTime(TimeDelta pause_time_sum);
+
+ private:
+ int priority_;
+ Timestamp enqueue_time_; // Absolute time of pacer queue entry.
+ uint64_t enqueue_order_;
+ bool is_retransmission_; // Cached for performance.
+ std::multiset<Timestamp>::iterator enqueue_time_it_;
+ // Raw pointer since priority_queue doesn't allow for moving
+ // out of the container.
+ RtpPacketToSend* owned_packet_;
+ };
+
struct StreamPrioKey {
StreamPrioKey(int priority, DataSize size)
: priority(priority), size(size) {}
@@ -163,8 +129,6 @@
bool IsSsrcScheduled(uint32_t ssrc) const;
Timestamp time_last_updated_;
- absl::optional<QueuedPacket> pop_packet_;
- absl::optional<Stream*> pop_stream_;
bool paused_;
size_t size_packets_;
@@ -186,12 +150,6 @@
// the age of the oldest packet in the queue.
std::multiset<Timestamp> enqueue_times_;
- // List of RTP packets to be sent, not necessarily in the order they will be
- // sent. PacketInfo.packet_it will point to an entry in this list, or the
- // end iterator of this list if queue does not have direct ownership of the
- // packet.
- std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
-
const bool send_side_bwe_with_overhead_;
};
} // namespace webrtc