Enqueue packet in pacer if sending fails
If a packet cannot be sent while pacer is in use it should be
queued. This avoid packet loss due to congestion.
BUG=1930
R=pwestin@webrtc.org, wu@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/1693004
git-svn-id: http://webrtc.googlecode.com/svn/trunk@4250 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h
index 0eeb992..dc18915 100644
--- a/webrtc/modules/pacing/include/paced_sender.h
+++ b/webrtc/modules/pacing/include/paced_sender.h
@@ -42,7 +42,8 @@
// Note: packets sent as a result of a callback should not pass by this
// module again.
// Called when it's time to send a queued packet.
- virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ // Returns false if packet cannot be sent.
+ virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) = 0;
// Called when it's a good time to send a padding data.
virtual int TimeToSendPadding(int bytes) = 0;
@@ -90,15 +91,13 @@
virtual int32_t Process();
private:
- // Checks if next packet in line can be transmitted. Returns true on success.
- bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
- int64_t* capture_time_ms, Priority* priority,
- bool* last_packet);
+ // Return true if next packet in line should be transmitted.
+ // Return packet list that contains the next packet.
+ bool ShouldSendNextPacket(paced_sender::PacketList** packet_list);
// Local helper function to GetNextPacket.
void GetNextPacketFromList(paced_sender::PacketList* packets,
- uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
- bool* last_packet);
+ uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
index 3e50275..2fdc5cf 100644
--- a/webrtc/modules/pacing/paced_sender.cc
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -181,7 +181,7 @@
if (capture_time_ms < 0) {
capture_time_ms = TickTime::MillisecondTimestamp();
}
- if (paused_ && priority == kNormalPriority &&
+ if (priority != kHighPriority &&
capture_time_ms > capture_time_ms_last_queued_) {
capture_time_ms_last_queued_ = capture_time_ms;
TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
@@ -252,11 +252,25 @@
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
- Priority priority;
- bool last_packet;
- while (GetNextPacket(&ssrc, &sequence_number, &capture_time_ms,
- &priority, &last_packet)) {
- if (priority == kNormalPriority) {
+ paced_sender::PacketList* packet_list;
+ while (ShouldSendNextPacket(&packet_list)) {
+ GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
+ &capture_time_ms);
+ critsect_->Leave();
+
+ const bool success = callback_->TimeToSendPacket(ssrc, sequence_number,
+ capture_time_ms);
+ // If packet cannt be sent then keep it in packet list and exit early.
+ // There's no need to send more packets.
+ if (!success) {
+ return 0;
+ }
+
+ critsect_->Enter();
+ packet_list->pop_front();
+ const bool last_packet = packet_list->empty() ||
+ packet_list->front().capture_time_ms_ > capture_time_ms;
+ if (packet_list != high_priority_packets_.get()) {
if (capture_time_ms > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = capture_time_ms;
} else if (capture_time_ms == capture_time_ms_last_sent_ &&
@@ -264,9 +278,6 @@
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
}
}
- critsect_->Leave();
- callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
- critsect_->Enter();
}
if (high_priority_packets_->empty() &&
normal_priority_packets_->empty() &&
@@ -295,61 +306,45 @@
}
// MUST have critsect_ when calling.
-bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
- int64_t* capture_time_ms, Priority* priority,
- bool* last_packet) {
+bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
if (media_budget_->bytes_remaining() <= 0) {
// All bytes consumed for this interval.
// Check if we have not sent in a too long time.
if ((TickTime::Now() - time_last_send_).Milliseconds() >
kMaxQueueTimeWithoutSendingMs) {
if (!high_priority_packets_->empty()) {
- *priority = kHighPriority;
- GetNextPacketFromList(high_priority_packets_.get(), ssrc,
- sequence_number, capture_time_ms, last_packet);
+ *packet_list = high_priority_packets_.get();
return true;
}
if (!normal_priority_packets_->empty()) {
- *priority = kNormalPriority;
- GetNextPacketFromList(normal_priority_packets_.get(), ssrc,
- sequence_number, capture_time_ms, last_packet);
+ *packet_list = normal_priority_packets_.get();
return true;
}
}
return false;
}
if (!high_priority_packets_->empty()) {
- *priority = kHighPriority;
- GetNextPacketFromList(high_priority_packets_.get(), ssrc, sequence_number,
- capture_time_ms, last_packet);
+ *packet_list = high_priority_packets_.get();
return true;
}
if (!normal_priority_packets_->empty()) {
- *priority = kNormalPriority;
- GetNextPacketFromList(normal_priority_packets_.get(), ssrc,
- sequence_number, capture_time_ms, last_packet);
+ *packet_list = normal_priority_packets_.get();
return true;
}
if (!low_priority_packets_->empty()) {
- *priority = kLowPriority;
- GetNextPacketFromList(low_priority_packets_.get(), ssrc, sequence_number,
- capture_time_ms, last_packet);
+ *packet_list = low_priority_packets_.get();
return true;
}
return false;
}
void PacedSender::GetNextPacketFromList(paced_sender::PacketList* packets,
- uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
- bool* last_packet) {
+ uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) {
paced_sender::Packet packet = packets->front();
UpdateMediaBytesSent(packet.bytes_);
*sequence_number = packet.sequence_number_;
*ssrc = packet.ssrc_;
*capture_time_ms = packet.capture_time_ms_;
- packets->pop_front();
- *last_packet = packets->empty() ||
- packets->front().capture_time_ms_ > *capture_time_ms;
}
// MUST have critsect_ when calling.
diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc
index c8dcd97..94a5c0b 100644
--- a/webrtc/modules/pacing/paced_sender_unittest.cc
+++ b/webrtc/modules/pacing/paced_sender_unittest.cc
@@ -25,7 +25,7 @@
class MockPacedSenderCallback : public PacedSender::Callback {
public:
MOCK_METHOD3(TimeToSendPacket,
- void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
+ bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
MOCK_METHOD1(TimeToSendPadding,
int(int bytes));
};
@@ -34,8 +34,9 @@
public:
PacedSenderPadding() : padding_sent_(0) {}
- void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) {
+ return true;
}
int TimeToSendPadding(int bytes) {
@@ -68,7 +69,9 @@
EXPECT_FALSE(send_bucket_->SendPacket(priority, ssrc,
sequence_number, capture_time_ms, size));
EXPECT_CALL(callback_, TimeToSendPacket(
- ssrc, sequence_number, capture_time_ms)).Times(1);
+ ssrc, sequence_number, capture_time_ms))
+ .Times(1)
+ .WillRepeatedly(Return(true));
}
MockPacedSenderCallback callback_;
@@ -98,7 +101,9 @@
TickTime::AdvanceFakeClock(1);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_CALL(callback_, TimeToSendPacket(
- ssrc, sequence_number++, capture_time_ms)).Times(1);
+ ssrc, sequence_number++, capture_time_ms))
+ .Times(1)
+ .WillRepeatedly(Return(true));
send_bucket_->Process();
sequence_number++;
SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
@@ -130,7 +135,9 @@
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_CALL(callback_,
- TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
+ TimeToSendPacket(ssrc, _, capture_time_ms))
+ .Times(3)
+ .WillRepeatedly(Return(true));
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
}
@@ -177,7 +184,9 @@
for (int i = 0; i < 3; ++i) {
EXPECT_CALL(callback_, TimeToSendPacket(ssrc, queued_sequence_number++,
- capture_time_ms)).Times(1);
+ capture_time_ms))
+ .Times(1)
+ .WillRepeatedly(Return(true));
}
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
@@ -317,7 +326,9 @@
// Expect all high and normal priority to be sent out first.
EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
- EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
+ EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms))
+ .Times(3)
+ .WillRepeatedly(Return(true));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
@@ -325,7 +336,9 @@
EXPECT_EQ(0, send_bucket_->Process());
EXPECT_CALL(callback_, TimeToSendPacket(
- ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1);
+ ssrc_low_priority, _, capture_time_ms_low_priority))
+ .Times(1)
+ .WillRepeatedly(Return(true));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
@@ -378,8 +391,9 @@
}
// Expect high prio packets to come out first followed by all packets in the
// way they were added.
- EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3);
-
+ EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms))
+ .Times(3)
+ .WillRepeatedly(Return(true));
send_bucket_->Resume();
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
@@ -387,9 +401,9 @@
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
EXPECT_EQ(0, send_bucket_->Process());
- EXPECT_CALL(callback_,
- TimeToSendPacket(_, _, second_capture_time_ms)).Times(1);
-
+ EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms))
+ .Times(1)
+ .WillRepeatedly(Return(true));
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
TickTime::AdvanceFakeClock(5);
EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
@@ -397,5 +411,62 @@
EXPECT_EQ(0, send_bucket_->QueueInMs());
}
+TEST_F(PacedSenderTest, ResendPacket) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = TickTime::MillisecondTimestamp();
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+
+ EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number,
+ capture_time_ms,
+ 250));
+ EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
+ ssrc,
+ sequence_number + 1,
+ capture_time_ms + 1,
+ 250));
+ TickTime::AdvanceFakeClock(10000);
+ EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
+ send_bucket_->QueueInMs());
+ // Fails to send first packet so only one call.
+ EXPECT_CALL(callback_, TimeToSendPacket(
+ ssrc, sequence_number, capture_time_ms))
+ .Times(1)
+ .WillOnce(Return(false));
+ TickTime::AdvanceFakeClock(10000);
+ send_bucket_->Process();
+
+ // Queue remains unchanged.
+ EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
+ send_bucket_->QueueInMs());
+
+ // Fails to send second packet.
+ EXPECT_CALL(callback_, TimeToSendPacket(
+ ssrc, sequence_number, capture_time_ms))
+ .Times(1)
+ .WillOnce(Return(true));
+ EXPECT_CALL(callback_, TimeToSendPacket(
+ ssrc, sequence_number + 1, capture_time_ms + 1))
+ .Times(1)
+ .WillOnce(Return(false));
+ TickTime::AdvanceFakeClock(10000);
+ send_bucket_->Process();
+
+ // Queue is reduced by 1 packet.
+ EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms - 1,
+ send_bucket_->QueueInMs());
+
+ // Send second packet and queue becomes empty.
+ EXPECT_CALL(callback_, TimeToSendPacket(
+ ssrc, sequence_number + 1, capture_time_ms + 1))
+ .Times(1)
+ .WillOnce(Return(true));
+ TickTime::AdvanceFakeClock(10000);
+ send_bucket_->Process();
+ EXPECT_EQ(0, send_bucket_->QueueInMs());
+}
+
} // namespace test
} // namespace webrtc
diff --git a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
index 2110429..5d11448 100644
--- a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
+++ b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
@@ -486,7 +486,7 @@
const RTPFragmentationHeader* fragmentation = NULL,
const RTPVideoHeader* rtpVideoHdr = NULL) = 0;
- virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) = 0;
virtual int TimeToSendPadding(int bytes) = 0;
diff --git a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 34d3ca4..0d39f8b 100644
--- a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -156,7 +156,7 @@
const RTPFragmentationHeader* fragmentation,
const RTPVideoHeader* rtpVideoHdr));
MOCK_METHOD3(TimeToSendPacket,
- void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
+ bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
MOCK_METHOD1(TimeToSendPadding,
int(int bytes));
MOCK_METHOD3(RegisterRtcpObservers,
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
index c5defe8..99cadf2 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
+++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
@@ -967,7 +967,7 @@
return ret_val;
}
-void ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc,
+bool ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms) {
WEBRTC_TRACE(
@@ -985,19 +985,21 @@
if (no_child_modules) {
// Don't send from default module.
if (SendingMedia() && ssrc == rtp_sender_.SSRC()) {
- rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
+ return rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
}
} else {
CriticalSectionScoped lock(critical_section_module_ptrs_.get());
std::list<ModuleRtpRtcpImpl*>::iterator it = child_modules_.begin();
while (it != child_modules_.end()) {
if ((*it)->SendingMedia() && ssrc == (*it)->rtp_sender_.SSRC()) {
- (*it)->rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
- return;
+ return (*it)->rtp_sender_.TimeToSendPacket(sequence_number,
+ capture_time_ms);
}
++it;
}
}
+ // No RTP sender is interested in sending this packet.
+ return true;
}
int ModuleRtpRtcpImpl::TimeToSendPadding(int bytes) {
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index 891d5b8..c70f7cb 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -188,7 +188,7 @@
const RTPFragmentationHeader* fragmentation = NULL,
const RTPVideoHeader* rtp_video_hdr = NULL);
- virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms);
// Returns the number of padding bytes actually sent, which can be more or
// less than |bytes|.
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
index 07a3f1b..744bbe2 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
+++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
@@ -701,7 +701,7 @@
}
// Called from pacer when we can send the packet.
-void RTPSender::TimeToSendPacket(uint16_t sequence_number,
+bool RTPSender::TimeToSendPacket(uint16_t sequence_number,
int64_t capture_time_ms) {
StorageType type;
uint16_t length = IP_PACKET_SIZE;
@@ -709,11 +709,13 @@
int64_t stored_time_ms;
if (packet_history_ == NULL) {
- return;
+ // Packet cannot be found. Allow sending to continue.
+ return true;
}
if (!packet_history_->GetRTPPacket(sequence_number, 0, data_buffer, &length,
&stored_time_ms, &type)) {
- return;
+ // Packet cannot be found. Allow sending to continue.
+ return true;
}
assert(length > 0);
@@ -736,7 +738,7 @@
rtp_header.sequenceNumber,
rtp_header.headerLength);
}
- SendPacketToNetwork(data_buffer, length);
+ return SendPacketToNetwork(data_buffer, length);
}
int RTPSender::TimeToSendPadding(int bytes) {
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h
index 1efba85..61dc1c5 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h
+++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h
@@ -165,7 +165,7 @@
const RTPHeader &rtp_header,
const int64_t now_ms) const;
- void TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms);
+ bool TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms);
int TimeToSendPadding(int bytes);
// NACK.
diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc
index 3bc4b06..be079a4 100644
--- a/webrtc/video_engine/vie_encoder.cc
+++ b/webrtc/video_engine/vie_encoder.cc
@@ -88,9 +88,9 @@
explicit ViEPacedSenderCallback(ViEEncoder* owner)
: owner_(owner) {
}
- virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) {
- owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
+ return owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
}
virtual int TimeToSendPadding(int bytes) {
return owner_->TimeToSendPadding(bytes);
@@ -482,9 +482,10 @@
return 0;
}
-void ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+bool ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms) {
- default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
+ return default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number,
+ capture_time_ms);
}
int ViEEncoder::TimeToSendPadding(int bytes) {
diff --git a/webrtc/video_engine/vie_encoder.h b/webrtc/video_engine/vie_encoder.h
index 1209b31..4c4cd18 100644
--- a/webrtc/video_engine/vie_encoder.h
+++ b/webrtc/video_engine/vie_encoder.h
@@ -173,7 +173,7 @@
const uint32_t round_trip_time_ms);
// Called by PacedSender.
- void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+ bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
int64_t capture_time_ms);
int TimeToSendPadding(int bytes);