Revert "Revert of Parse FlexFEC RTP headers in Call and add integration with BWE. (patchset #17 id:460001 of https://codereview.webrtc.org/2553863003/ )"
Problem fixed: RTP header extensions were not properly set in tests.
BUG=webrtc:5654
Review-Url: https://codereview.webrtc.org/2593963003
Cr-Commit-Position: refs/heads/master@{#15741}
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index 1c4b7b2..c7998b2 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -24,6 +24,7 @@
#include "webrtc/base/checks.h"
#include "webrtc/base/constructormagic.h"
#include "webrtc/base/logging.h"
+#include "webrtc/base/optional.h"
#include "webrtc/base/task_queue.h"
#include "webrtc/base/thread_annotations.h"
#include "webrtc/base/thread_checker.h"
@@ -39,6 +40,8 @@
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
#include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
#include "webrtc/modules/utility/include/process_thread.h"
#include "webrtc/system_wrappers/include/clock.h"
#include "webrtc/system_wrappers/include/cpu_info.h"
@@ -107,6 +110,8 @@
// Implements RecoveredPacketReceiver.
bool OnRecoveredPacket(const uint8_t* packet, size_t length) override;
+ void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet);
+
void SetBitrateConfig(
const webrtc::Call::Config::BitrateConfig& bitrate_config) override;
@@ -154,6 +159,11 @@
return nullptr;
}
+ rtc::Optional<RtpPacketReceived> ParseRtpPacket(const uint8_t* packet,
+ size_t length,
+ const PacketTime& packet_time)
+ SHARED_LOCKS_REQUIRED(receive_crit_);
+
void UpdateSendHistograms() EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
void UpdateReceiveHistograms();
void UpdateHistograms();
@@ -192,6 +202,14 @@
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
GUARDED_BY(receive_crit_);
+ // Registered RTP header extensions for each stream.
+ // Note that RTP header extensions are negotiated per track ("m= line") in the
+ // SDP, but we have no notion of tracks at the Call level. We therefore store
+ // the RTP header extensions per SSRC instead, which leads to some storage
+ // overhead.
+ std::map<uint32_t, RtpHeaderExtensionMap> received_rtp_header_extensions_
+ GUARDED_BY(receive_crit_);
+
std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them.
std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ GUARDED_BY(send_crit_);
@@ -345,6 +363,29 @@
Trace::ReturnTrace();
}
+rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
+ const uint8_t* packet,
+ size_t length,
+ const PacketTime& packet_time) {
+ RtpPacketReceived parsed_packet;
+ if (!parsed_packet.Parse(packet, length))
+ return rtc::Optional<RtpPacketReceived>();
+
+ auto it = received_rtp_header_extensions_.find(parsed_packet.Ssrc());
+ if (it != received_rtp_header_extensions_.end())
+ parsed_packet.IdentifyExtensions(it->second);
+
+ int64_t arrival_time_ms;
+ if (packet_time.timestamp != -1) {
+ arrival_time_ms = (packet_time.timestamp + 500) / 1000;
+ } else {
+ arrival_time_ms = clock_->TimeInMilliseconds();
+ }
+ parsed_packet.set_arrival_time_ms(arrival_time_ms);
+
+ return rtc::Optional<RtpPacketReceived>(std::move(parsed_packet));
+}
+
void Call::UpdateHistograms() {
RTC_HISTOGRAM_COUNTS_100000(
"WebRTC.Call.LifetimeInSeconds",
@@ -659,25 +700,40 @@
const FlexfecReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+
+ RecoveredPacketReceiver* recovered_packet_receiver = this;
FlexfecReceiveStreamImpl* receive_stream =
- new FlexfecReceiveStreamImpl(config, this);
+ new FlexfecReceiveStreamImpl(config, recovered_packet_receiver);
{
WriteLockScoped write_lock(*receive_crit_);
+
+ RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) ==
+ flexfec_receive_streams_.end());
+ flexfec_receive_streams_.insert(receive_stream);
+
for (auto ssrc : config.protected_media_ssrcs)
flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
+
RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) ==
flexfec_receive_ssrcs_protection_.end());
flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream;
- flexfec_receive_streams_.insert(receive_stream);
+
+ RTC_DCHECK(received_rtp_header_extensions_.find(config.remote_ssrc) ==
+ received_rtp_header_extensions_.end());
+ RtpHeaderExtensionMap rtp_header_extensions(config.rtp_header_extensions);
+ received_rtp_header_extensions_[config.remote_ssrc] = rtp_header_extensions;
}
+
// TODO(brandtr): Store config in RtcEventLog here.
+
return receive_stream;
}
void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+
RTC_DCHECK(receive_stream != nullptr);
// There exist no other derived classes of FlexfecReceiveStream,
// so this downcast is safe.
@@ -685,15 +741,12 @@
static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
{
WriteLockScoped write_lock(*receive_crit_);
+
+ uint32_t ssrc = receive_stream_impl->GetConfig().remote_ssrc;
+ received_rtp_header_extensions_.erase(ssrc);
+
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
- auto media_it = flexfec_receive_ssrcs_media_.begin();
- while (media_it != flexfec_receive_ssrcs_media_.end()) {
- if (media_it->second == receive_stream_impl)
- media_it = flexfec_receive_ssrcs_media_.erase(media_it);
- else
- ++media_it;
- }
auto prot_it = flexfec_receive_ssrcs_protection_.begin();
while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
if (prot_it->second == receive_stream_impl)
@@ -701,8 +754,17 @@
else
++prot_it;
}
+ auto media_it = flexfec_receive_ssrcs_media_.begin();
+ while (media_it != flexfec_receive_ssrcs_media_.end()) {
+ if (media_it->second == receive_stream_impl)
+ media_it = flexfec_receive_ssrcs_media_.erase(media_it);
+ else
+ ++media_it;
+ }
+
flexfec_receive_streams_.erase(receive_stream_impl);
}
+
delete receive_stream_impl;
}
@@ -1076,13 +1138,21 @@
if (it != video_receive_ssrcs_.end()) {
received_bytes_per_second_counter_.Add(static_cast<int>(length));
received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
+ // TODO(brandtr): Notify the BWE of received media packets here.
auto status = it->second->DeliverRtp(packet, length, packet_time)
? DELIVERY_OK
: DELIVERY_PACKET_ERROR;
- // Deliver media packets to FlexFEC subsystem.
- auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
- for (auto it = it_bounds.first; it != it_bounds.second; ++it)
- it->second->AddAndProcessReceivedPacket(packet, length);
+ // Deliver media packets to FlexFEC subsystem. RTP header extensions need
+ // not be parsed, as FlexFEC is oblivious to the semantic meaning of the
+ // packet contents beyond the 12 byte RTP base header. The BWE is fed
+ // information about these media packets from the regular media pipeline.
+ rtc::Optional<RtpPacketReceived> parsed_packet =
+ ParseRtpPacket(packet, length, packet_time);
+ if (parsed_packet) {
+ auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
+ for (auto it = it_bounds.first; it != it_bounds.second; ++it)
+ it->second->AddAndProcessReceivedPacket(*parsed_packet);
+ }
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
@@ -1091,12 +1161,18 @@
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
if (it != flexfec_receive_ssrcs_protection_.end()) {
- auto status = it->second->AddAndProcessReceivedPacket(packet, length)
- ? DELIVERY_OK
- : DELIVERY_PACKET_ERROR;
- if (status == DELIVERY_OK)
- event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
- return status;
+ rtc::Optional<RtpPacketReceived> parsed_packet =
+ ParseRtpPacket(packet, length, packet_time);
+ if (parsed_packet) {
+ NotifyBweOfReceivedPacket(*parsed_packet);
+ auto status =
+ it->second->AddAndProcessReceivedPacket(std::move(*parsed_packet))
+ ? DELIVERY_OK
+ : DELIVERY_PACKET_ERROR;
+ if (status == DELIVERY_OK)
+ event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
+ return status;
+ }
}
}
return DELIVERY_UNKNOWN_SSRC;
@@ -1128,5 +1204,12 @@
return it->second->OnRecoveredPacket(packet, length);
}
+void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet) {
+ RTPHeader header;
+ packet.GetHeader(&header);
+ congestion_controller_->OnReceivedPacket(packet.arrival_time_ms(),
+ packet.payload_size(), header);
+}
+
} // namespace internal
} // namespace webrtc
diff --git a/webrtc/call/flexfec_receive_stream.h b/webrtc/call/flexfec_receive_stream.h
index 83b212b..20a57f2 100644
--- a/webrtc/call/flexfec_receive_stream.h
+++ b/webrtc/call/flexfec_receive_stream.h
@@ -59,7 +59,7 @@
bool transport_cc = false;
// RTP header extensions that have been negotiated for this track.
- std::vector<RtpExtension> extensions;
+ std::vector<RtpExtension> rtp_header_extensions;
};
// Starts stream activity.
diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc
index e0e49c1..13390a1 100644
--- a/webrtc/call/flexfec_receive_stream_impl.cc
+++ b/webrtc/call/flexfec_receive_stream_impl.cc
@@ -10,6 +10,8 @@
#include "webrtc/call/flexfec_receive_stream_impl.h"
+#include <utility>
+
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
@@ -34,12 +36,12 @@
if (!protected_media_ssrcs.empty())
ss << protected_media_ssrcs[i];
ss << "], transport_cc: " << (transport_cc ? "on" : "off");
- ss << ", extensions: [";
+ ss << ", rtp_header_extensions: [";
i = 0;
- for (; i + 1 < extensions.size(); ++i)
- ss << extensions[i].ToString() << ", ";
- if (!extensions.empty())
- ss << extensions[i].ToString();
+ for (; i + 1 < rtp_header_extensions.size(); ++i)
+ ss << rtp_header_extensions[i].ToString() << ", ";
+ if (!rtp_header_extensions.empty())
+ ss << rtp_header_extensions[i].ToString();
ss << "]}";
return ss.str();
}
@@ -49,7 +51,7 @@
// TODO(brandtr): Update this function when we support multistream protection.
std::unique_ptr<FlexfecReceiver> MaybeCreateFlexfecReceiver(
const FlexfecReceiveStream::Config& config,
- RecoveredPacketReceiver* recovered_packet_callback) {
+ RecoveredPacketReceiver* recovered_packet_receiver) {
if (config.payload_type < 0) {
LOG(LS_WARNING) << "Invalid FlexFEC payload type given. "
<< "This FlexfecReceiveStream will therefore be useless.";
@@ -79,18 +81,18 @@
RTC_DCHECK_EQ(1U, config.protected_media_ssrcs.size());
return std::unique_ptr<FlexfecReceiver>(
new FlexfecReceiver(config.remote_ssrc, config.protected_media_ssrcs[0],
- recovered_packet_callback));
+ recovered_packet_receiver));
}
} // namespace
FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
const Config& config,
- RecoveredPacketReceiver* recovered_packet_callback)
+ RecoveredPacketReceiver* recovered_packet_receiver)
: started_(false),
config_(config),
receiver_(
- MaybeCreateFlexfecReceiver(config_, recovered_packet_callback)) {
+ MaybeCreateFlexfecReceiver(config_, recovered_packet_receiver)) {
LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString();
}
@@ -100,8 +102,7 @@
}
bool FlexfecReceiveStreamImpl::AddAndProcessReceivedPacket(
- const uint8_t* packet,
- size_t packet_length) {
+ RtpPacketReceived packet) {
{
rtc::CritScope cs(&crit_);
if (!started_)
@@ -109,7 +110,7 @@
}
if (!receiver_)
return false;
- return receiver_->AddAndProcessReceivedPacket(packet, packet_length);
+ return receiver_->AddAndProcessReceivedPacket(std::move(packet));
}
void FlexfecReceiveStreamImpl::Start() {
diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h
index 79c6cc0..7267dc0 100644
--- a/webrtc/call/flexfec_receive_stream_impl.h
+++ b/webrtc/call/flexfec_receive_stream_impl.h
@@ -18,16 +18,19 @@
#include "webrtc/base/criticalsection.h"
#include "webrtc/call/flexfec_receive_stream.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
namespace webrtc {
class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
public:
FlexfecReceiveStreamImpl(const Config& config,
- RecoveredPacketReceiver* recovered_packet_callback);
+ RecoveredPacketReceiver* recovered_packet_receiver);
~FlexfecReceiveStreamImpl() override;
- bool AddAndProcessReceivedPacket(const uint8_t* packet, size_t length);
+ const Config& GetConfig() const { return config_; }
+
+ bool AddAndProcessReceivedPacket(RtpPacketReceived packet);
// Implements FlexfecReceiveStream.
void Start() override;
diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc
index 314d9c0..07d3943 100644
--- a/webrtc/call/flexfec_receive_stream_unittest.cc
+++ b/webrtc/call/flexfec_receive_stream_unittest.cc
@@ -8,24 +8,36 @@
* be found in the AUTHORS file in the root of the source tree.
*/
+#include "webrtc/base/array_view.h"
#include "webrtc/base/basictypes.h"
#include "webrtc/call/flexfec_receive_stream_impl.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
#include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h"
#include "webrtc/test/gmock.h"
#include "webrtc/test/gtest.h"
namespace webrtc {
+namespace {
+
+RtpPacketReceived ParsePacket(rtc::ArrayView<const uint8_t> packet) {
+ RtpPacketReceived parsed_packet(nullptr);
+ EXPECT_TRUE(parsed_packet.Parse(packet));
+ return parsed_packet;
+}
+
+} // namespace
+
TEST(FlexfecReceiveStreamTest, ConstructDestruct) {
FlexfecReceiveStream::Config config;
config.payload_type = 118;
config.remote_ssrc = 424223;
config.protected_media_ssrcs = {912512};
- MockRecoveredPacketReceiver callback;
+ MockRecoveredPacketReceiver recovered_packet_receiver;
- FlexfecReceiveStreamImpl receive_stream(config, &callback);
+ FlexfecReceiveStreamImpl receive_stream(config, &recovered_packet_receiver);
}
TEST(FlexfecReceiveStreamTest, StartStop) {
@@ -33,27 +45,13 @@
config.payload_type = 118;
config.remote_ssrc = 1652392;
config.protected_media_ssrcs = {23300443};
- MockRecoveredPacketReceiver callback;
- FlexfecReceiveStreamImpl receive_stream(config, &callback);
+ MockRecoveredPacketReceiver recovered_packet_receiver;
+ FlexfecReceiveStreamImpl receive_stream(config, &recovered_packet_receiver);
receive_stream.Start();
receive_stream.Stop();
}
-TEST(FlexfecReceiveStreamTest, DoesNotProcessPacketWhenNoMediaSsrcGiven) {
- FlexfecReceiveStream::Config config;
- config.payload_type = 118;
- config.remote_ssrc = 424223;
- config.protected_media_ssrcs = {};
- MockRecoveredPacketReceiver callback;
- FlexfecReceiveStreamImpl receive_stream(config, &callback);
- const uint8_t packet[] = {0x00, 0x11, 0x22, 0x33};
- const size_t packet_length = sizeof(packet);
-
- EXPECT_FALSE(
- receive_stream.AddAndProcessReceivedPacket(packet, packet_length));
-}
-
// Create a FlexFEC packet that protects a single media packet and ensure
// that the callback is called. Correctness of recovery is checked in the
// FlexfecReceiver unit tests.
@@ -91,7 +89,6 @@
// FEC payload.
kPayloadBits, kPayloadBits, kPayloadBits, kPayloadBits};
// clang-format on
- constexpr size_t kFlexfecPacketLength = sizeof(kFlexfecPacket);
FlexfecReceiveStream::Config config;
config.payload_type = kFlexfecPlType;
@@ -102,16 +99,14 @@
FlexfecReceiveStreamImpl receive_stream(config, &recovered_packet_receiver);
// Do not call back before being started.
- receive_stream.AddAndProcessReceivedPacket(kFlexfecPacket,
- kFlexfecPacketLength);
+ receive_stream.AddAndProcessReceivedPacket(ParsePacket(kFlexfecPacket));
// Call back after being started.
receive_stream.Start();
EXPECT_CALL(
recovered_packet_receiver,
OnRecoveredPacket(::testing::_, kRtpHeaderSize + kPayloadLength[1]));
- receive_stream.AddAndProcessReceivedPacket(kFlexfecPacket,
- kFlexfecPacketLength);
+ receive_stream.AddAndProcessReceivedPacket(ParsePacket(kFlexfecPacket));
}
} // namespace webrtc