Allow RTP module thread checking to know PacketRouter status.
Since https://webrtc-review.googlesource.com/c/src/+/228433 both audio
and video now only call Get/SetRtpState while not registered to the
packet router.
We can thus remove the lock around packet sequencer and just use a
thread checker.
Bug: webrtc:11340
Change-Id: Ie6865cc96c36208700c31a75747ff4dd992ce68d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228435
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34755}
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 7f0e9da..f95c743 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -451,6 +451,15 @@
// Signal congestion controller this object is ready for OnPacket* callbacks.
transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
rtp_config_.ssrcs, this);
+
+ // Construction happens on the worker thread (see Call::CreateVideoSendStream)
+ // but subseqeuent calls to the RTP state will happen on one of two threads:
+ // * The pacer thread for actually sending packets.
+ // * The transport thread when tearing down and quering GetRtpState().
+ // Detach thread checkers.
+ for (const RtpStreamSender& stream : rtp_streams_) {
+ stream.rtp_rtcp->OnPacketSendingThreadSwitched();
+ }
}
RtpVideoSender::~RtpVideoSender() {
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index 3b1278e..fcc7ee3 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -68,6 +68,10 @@
void PacketRouter::AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module,
uint32_t ssrc) {
RTC_DCHECK(send_modules_map_.find(ssrc) == send_modules_map_.end());
+
+ // Signal to module that the pacer thread is attached and can send packets.
+ rtp_module->OnPacketSendingThreadSwitched();
+
// Always keep the audio modules at the back of the list, so that when we
// iterate over the modules in order to find one that can send padding we
// will prioritize video. This is important to make sure they are counted
@@ -102,6 +106,7 @@
if (last_send_module_ == rtp_module) {
last_send_module_ = nullptr;
}
+ rtp_module->OnPacketSendingThreadSwitched();
}
void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 59b8cf1..33c5a9b 100644
--- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -115,6 +115,7 @@
(rtc::ArrayView<const uint16_t> sequence_numbers),
(const, override));
MOCK_METHOD(size_t, ExpectedPerPacketOverhead, (), (const, override));
+ MOCK_METHOD(void, OnPacketSendingThreadSwitched, (), (override));
MOCK_METHOD(RtcpMode, RTCP, (), (const, override));
MOCK_METHOD(void, SetRTCPStatus, (RtcpMode method), (override));
MOCK_METHOD(int32_t,
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
index f6f9afb..34e8429 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
@@ -490,6 +490,8 @@
return rtp_sender_->packet_generator.ExpectedPerPacketOverhead();
}
+void ModuleRtpRtcpImpl::OnPacketSendingThreadSwitched() {}
+
size_t ModuleRtpRtcpImpl::MaxRtpPacketSize() const {
RTC_DCHECK(rtp_sender_);
return rtp_sender_->packet_generator.MaxRtpPacketSize();
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index 81b1170..2ffe013 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -157,6 +157,8 @@
size_t ExpectedPerPacketOverhead() const override;
+ void OnPacketSendingThreadSwitched() override;
+
// RTCP part.
// Get RTCP status.
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
index ec42f61..7d33ca0 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
@@ -64,22 +64,22 @@
const RtpRtcpInterface::Configuration& config)
: packet_history(config.clock, config.enable_rtx_padding_prioritization),
deferred_sequencing_(config.use_deferred_sequencing),
- sequencer_(config.local_media_ssrc,
- config.rtx_send_ssrc,
- /*require_marker_before_media_padding=*/!config.audio,
- config.clock),
+ sequencer(config.local_media_ssrc,
+ config.rtx_send_ssrc,
+ /*require_marker_before_media_padding=*/!config.audio,
+ config.clock),
packet_sender(config, &packet_history),
non_paced_sender(&packet_sender, this, config.use_deferred_sequencing),
packet_generator(
config,
&packet_history,
config.paced_sender ? config.paced_sender : &non_paced_sender,
- config.use_deferred_sequencing ? nullptr : &sequencer_) {}
+ config.use_deferred_sequencing ? nullptr : &sequencer) {}
void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber(
RtpPacketToSend* packet) {
+ RTC_DCHECK_RUN_ON(&sequencing_checker);
if (deferred_sequencing_) {
- MutexLock lock(&mutex_sequencer_);
- sequencer_.Sequence(*packet);
+ sequencer.Sequence(*packet);
} else {
packet_generator.AssignSequenceNumber(packet);
}
@@ -101,10 +101,10 @@
rtt_stats_(configuration.rtt_stats),
rtt_ms_(0) {
RTC_DCHECK(worker_queue_);
- packet_sequence_checker_.Detach();
- pacer_thread_checker_.Detach();
+ rtcp_thread_checker_.Detach();
if (!configuration.receiver_only) {
rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
+ rtp_sender_->sequencing_checker.Detach();
// Make sure rtcp sender use same timestamp offset as rtp sender.
rtcp_sender_.SetTimestampOffset(
rtp_sender_->packet_generator.TimestampOffset());
@@ -162,7 +162,7 @@
void ModuleRtpRtcpImpl2::IncomingRtcpPacket(const uint8_t* rtcp_packet,
const size_t length) {
- RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+ RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
rtcp_receiver_.IncomingPacket(rtcp_packet, length);
}
@@ -187,20 +187,19 @@
}
uint16_t ModuleRtpRtcpImpl2::SequenceNumber() const {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- return rtp_sender_->sequencer_.media_sequence_number();
+ return rtp_sender_->sequencer.media_sequence_number();
}
return rtp_sender_->packet_generator.SequenceNumber();
}
// Set SequenceNumber, default is a random number.
void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
- RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- if (rtp_sender_->sequencer_.media_sequence_number() != seq_num) {
- rtp_sender_->sequencer_.set_media_sequence_number(seq_num);
+ if (rtp_sender_->sequencer.media_sequence_number() != seq_num) {
+ rtp_sender_->sequencer.set_media_sequence_number(seq_num);
rtp_sender_->packet_history.Clear();
}
} else {
@@ -209,36 +208,36 @@
}
void ModuleRtpRtcpImpl2::SetRtpState(const RtpState& rtp_state) {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
rtp_sender_->packet_generator.SetRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- rtp_sender_->sequencer_.SetRtpState(rtp_state);
+ rtp_sender_->sequencer.SetRtpState(rtp_state);
}
rtcp_sender_.SetTimestampOffset(rtp_state.start_timestamp);
}
void ModuleRtpRtcpImpl2::SetRtxState(const RtpState& rtp_state) {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
rtp_sender_->packet_generator.SetRtxRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- rtp_sender_->sequencer_.set_rtx_sequence_number(rtp_state.sequence_number);
+ rtp_sender_->sequencer.set_rtx_sequence_number(rtp_state.sequence_number);
}
}
RtpState ModuleRtpRtcpImpl2::GetRtpState() const {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
RtpState state = rtp_sender_->packet_generator.GetRtpState();
if (rtp_sender_->deferred_sequencing_) {
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- rtp_sender_->sequencer_.PopulateRtpState(state);
+ rtp_sender_->sequencer.PopulateRtpState(state);
}
return state;
}
RtpState ModuleRtpRtcpImpl2::GetRtxState() const {
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
RtpState state = rtp_sender_->packet_generator.GetRtxRtpState();
if (rtp_sender_->deferred_sequencing_) {
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
- state.sequence_number = rtp_sender_->sequencer_.rtx_sequence_number();
+ state.sequence_number = rtp_sender_->sequencer.rtx_sequence_number();
}
return state;
}
@@ -249,7 +248,7 @@
}
uint32_t ModuleRtpRtcpImpl2::local_media_ssrc() const {
- RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+ RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
RTC_DCHECK_EQ(rtcp_receiver_.local_media_ssrc(), rtcp_sender_.SSRC());
return rtcp_receiver_.local_media_ssrc();
}
@@ -335,12 +334,6 @@
// updated.
void ModuleRtpRtcpImpl2::SetSendingMediaStatus(const bool sending) {
if (rtp_sender_) {
- // Turning on or off sending status indicates module being set
- // up or torn down, detach thread checker since subsequent calls
- // may be from a different thread.
- if (rtp_sender_->packet_generator.SendingMedia() != sending) {
- pacer_thread_checker_.Detach();
- }
rtp_sender_->packet_generator.SetSendingMediaStatus(sending);
} else {
RTC_DCHECK(!sending);
@@ -389,15 +382,14 @@
bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(rtp_sender_);
- RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
}
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
if (packet->packet_type() == RtpPacketMediaType::kPadding &&
packet->Ssrc() == rtp_sender_->packet_generator.SSRC() &&
- !rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()) {
+ !rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()) {
// New media packet preempted this generated padding packet, discard it.
return false;
}
@@ -405,7 +397,7 @@
packet->packet_type() == RtpPacketMediaType::kForwardErrorCorrection &&
packet->Ssrc() == rtp_sender_->packet_generator.FlexfecSsrc();
if (!is_flexfec) {
- rtp_sender_->sequencer_.Sequence(*packet);
+ rtp_sender_->sequencer.Sequence(*packet);
}
} else if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
@@ -425,7 +417,7 @@
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::FetchFecPackets() {
RTC_DCHECK(rtp_sender_);
- RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
auto fec_packets = rtp_sender_->packet_sender.FetchFecPackets();
if (!fec_packets.empty() && !rtp_sender_->deferred_sequencing_) {
// Only assign sequence numbers for FEC packets in non-deferred mode, and
@@ -460,17 +452,15 @@
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::GeneratePadding(size_t target_size_bytes) {
RTC_DCHECK(rtp_sender_);
- RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
- MutexLock lock(&rtp_sender_->mutex_sequencer_);
+ RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
// `can_send_padding_on_media_ssrc` set to false when deferred sequencing
// is off. It will be ignored in that case, RTPSender will internally query
- // `sequencer_` while holding the send lock instead.
+ // `sequencer` while holding the send lock instead.
return rtp_sender_->packet_generator.GeneratePadding(
target_size_bytes, rtp_sender_->packet_sender.MediaHasBeenSent(),
-
rtp_sender_->deferred_sequencing_
- ? rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()
+ ? rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()
: false);
}
@@ -488,6 +478,11 @@
return rtp_sender_->packet_generator.ExpectedPerPacketOverhead();
}
+void ModuleRtpRtcpImpl2::OnPacketSendingThreadSwitched() {
+ // Ownership of sequencing is being transferred to another thread.
+ rtp_sender_->sequencing_checker.Detach();
+}
+
size_t ModuleRtpRtcpImpl2::MaxRtpPacketSize() const {
RTC_DCHECK(rtp_sender_);
return rtp_sender_->packet_generator.MaxRtpPacketSize();
@@ -730,7 +725,7 @@
}
void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) {
- RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+ RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
rtcp_receiver_.set_local_media_ssrc(local_ssrc);
rtcp_sender_.SetSsrc(local_ssrc);
}
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
index b468bb7..f01c0c0 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
@@ -169,6 +169,8 @@
size_t ExpectedPerPacketOverhead() const override;
+ void OnPacketSendingThreadSwitched() override;
+
// RTCP part.
// Get RTCP status.
@@ -269,12 +271,9 @@
// If false, sequencing is owned by `packet_generator` and can happen on
// several threads. If true, sequencing always happens on the pacer thread.
const bool deferred_sequencing_;
- // TODO(bugs.webrtc.org/11340): Remove lock one we can guarantee that
- // setting/getting rtp state only happens after removal from packet sending
- // code path.
- mutable Mutex mutex_sequencer_;
+ SequenceChecker sequencing_checker;
// Handles sequence number assignment and padding timestamp generation.
- PacketSequencer sequencer_ RTC_GUARDED_BY(mutex_sequencer_);
+ PacketSequencer sequencer RTC_GUARDED_BY(sequencing_checker);
// Handles final time timestamping/stats/etc and handover to Transport.
RtpSenderEgress packet_sender;
// If no paced sender configured, this class will be used to pass packets
@@ -314,8 +313,7 @@
TimeDelta duration);
TaskQueueBase* const worker_queue_;
- RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_;
- RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_thread_checker_;
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker rtcp_thread_checker_;
std::unique_ptr<RtpSenderContext> rtp_sender_;
RTCPSender rtcp_sender_;
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
index e90d866..c1a44fa 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
@@ -347,6 +347,15 @@
// when we expect to send them).
virtual size_t ExpectedPerPacketOverhead() const = 0;
+ // Access to packet state (e.g. sequence numbering) must only be access by
+ // one thread at a time. It may be only one thread, or a construction thread
+ // that calls SetRtpState() - handing over to a pacer thread that calls
+ // TrySendPacket() - and at teardown ownership is handed to a destruciton
+ // thread that calls GetRtpState().
+ // This method is used to signal that "ownership" of the rtp state is being
+ // transferred to another thread.
+ virtual void OnPacketSendingThreadSwitched() = 0;
+
// **************************************************************************
// RTCP
// **************************************************************************