Enqueue packet in pacer if sending fails

If a packet cannot be sent while pacer is in use it should be
queued. This avoid packet loss due to congestion.

BUG=1930
R=pwestin@webrtc.org, wu@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/1693004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4250 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/pacing/include/paced_sender.h b/webrtc/modules/pacing/include/paced_sender.h
index 0eeb992..dc18915 100644
--- a/webrtc/modules/pacing/include/paced_sender.h
+++ b/webrtc/modules/pacing/include/paced_sender.h
@@ -42,7 +42,8 @@
     // Note: packets sent as a result of a callback should not pass by this
     // module again.
     // Called when it's time to send a queued packet.
-    virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+    // Returns false if packet cannot be sent.
+    virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                                   int64_t capture_time_ms) = 0;
     // Called when it's a good time to send a padding data.
     virtual int TimeToSendPadding(int bytes) = 0;
@@ -90,15 +91,13 @@
   virtual int32_t Process();
 
  private:
-  // Checks if next packet in line can be transmitted. Returns true on success.
-  bool GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
-                     int64_t* capture_time_ms, Priority* priority,
-                     bool* last_packet);
+  // Return true if next packet in line should be transmitted.
+  // Return packet list that contains the next packet.
+  bool ShouldSendNextPacket(paced_sender::PacketList** packet_list);
 
   // Local helper function to GetNextPacket.
   void GetNextPacketFromList(paced_sender::PacketList* packets,
-      uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
-      bool* last_packet);
+      uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms);
 
   // Updates the number of bytes that can be sent for the next time interval.
   void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
index 3e50275..2fdc5cf 100644
--- a/webrtc/modules/pacing/paced_sender.cc
+++ b/webrtc/modules/pacing/paced_sender.cc
@@ -181,7 +181,7 @@
   if (capture_time_ms < 0) {
     capture_time_ms = TickTime::MillisecondTimestamp();
   }
-  if (paused_ && priority == kNormalPriority &&
+  if (priority != kHighPriority &&
       capture_time_ms > capture_time_ms_last_queued_) {
     capture_time_ms_last_queued_ = capture_time_ms;
     TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
@@ -252,11 +252,25 @@
     uint32_t ssrc;
     uint16_t sequence_number;
     int64_t capture_time_ms;
-    Priority priority;
-    bool last_packet;
-    while (GetNextPacket(&ssrc, &sequence_number, &capture_time_ms,
-                         &priority, &last_packet)) {
-      if (priority == kNormalPriority) {
+    paced_sender::PacketList* packet_list;
+    while (ShouldSendNextPacket(&packet_list)) {
+      GetNextPacketFromList(packet_list, &ssrc, &sequence_number,
+                            &capture_time_ms);
+      critsect_->Leave();
+
+      const bool success = callback_->TimeToSendPacket(ssrc, sequence_number,
+                                                       capture_time_ms);
+      // If packet cannt be sent then keep it in packet list and exit early.
+      // There's no need to send more packets.
+      if (!success) {
+        return 0;
+      }
+
+      critsect_->Enter();
+      packet_list->pop_front();
+      const bool last_packet = packet_list->empty() ||
+          packet_list->front().capture_time_ms_ > capture_time_ms;
+      if (packet_list != high_priority_packets_.get()) {
         if (capture_time_ms > capture_time_ms_last_sent_) {
           capture_time_ms_last_sent_ = capture_time_ms;
         } else if (capture_time_ms == capture_time_ms_last_sent_ &&
@@ -264,9 +278,6 @@
           TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", capture_time_ms);
         }
       }
-      critsect_->Leave();
-      callback_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
-      critsect_->Enter();
     }
     if (high_priority_packets_->empty() &&
         normal_priority_packets_->empty() &&
@@ -295,61 +306,45 @@
 }
 
 // MUST have critsect_ when calling.
-bool PacedSender::GetNextPacket(uint32_t* ssrc, uint16_t* sequence_number,
-                                int64_t* capture_time_ms, Priority* priority,
-                                bool* last_packet) {
+bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
   if (media_budget_->bytes_remaining() <= 0) {
     // All bytes consumed for this interval.
     // Check if we have not sent in a too long time.
     if ((TickTime::Now() - time_last_send_).Milliseconds() >
         kMaxQueueTimeWithoutSendingMs) {
       if (!high_priority_packets_->empty()) {
-        *priority = kHighPriority;
-        GetNextPacketFromList(high_priority_packets_.get(), ssrc,
-                              sequence_number, capture_time_ms, last_packet);
+        *packet_list = high_priority_packets_.get();
         return true;
       }
       if (!normal_priority_packets_->empty()) {
-        *priority = kNormalPriority;
-        GetNextPacketFromList(normal_priority_packets_.get(), ssrc,
-                              sequence_number, capture_time_ms, last_packet);
+        *packet_list = normal_priority_packets_.get();
         return true;
       }
     }
     return false;
   }
   if (!high_priority_packets_->empty()) {
-    *priority = kHighPriority;
-    GetNextPacketFromList(high_priority_packets_.get(), ssrc, sequence_number,
-                          capture_time_ms, last_packet);
+    *packet_list = high_priority_packets_.get();
     return true;
   }
   if (!normal_priority_packets_->empty()) {
-    *priority = kNormalPriority;
-    GetNextPacketFromList(normal_priority_packets_.get(), ssrc,
-                          sequence_number, capture_time_ms, last_packet);
+    *packet_list = normal_priority_packets_.get();
     return true;
   }
   if (!low_priority_packets_->empty()) {
-    *priority = kLowPriority;
-    GetNextPacketFromList(low_priority_packets_.get(), ssrc, sequence_number,
-                          capture_time_ms, last_packet);
+    *packet_list = low_priority_packets_.get();
     return true;
   }
   return false;
 }
 
 void PacedSender::GetNextPacketFromList(paced_sender::PacketList* packets,
-    uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms,
-    bool* last_packet) {
+    uint32_t* ssrc, uint16_t* sequence_number, int64_t* capture_time_ms) {
   paced_sender::Packet packet = packets->front();
   UpdateMediaBytesSent(packet.bytes_);
   *sequence_number = packet.sequence_number_;
   *ssrc = packet.ssrc_;
   *capture_time_ms = packet.capture_time_ms_;
-  packets->pop_front();
-  *last_packet = packets->empty() ||
-      packets->front().capture_time_ms_ > *capture_time_ms;
 }
 
 // MUST have critsect_ when calling.
diff --git a/webrtc/modules/pacing/paced_sender_unittest.cc b/webrtc/modules/pacing/paced_sender_unittest.cc
index c8dcd97..94a5c0b 100644
--- a/webrtc/modules/pacing/paced_sender_unittest.cc
+++ b/webrtc/modules/pacing/paced_sender_unittest.cc
@@ -25,7 +25,7 @@
 class MockPacedSenderCallback : public PacedSender::Callback {
  public:
   MOCK_METHOD3(TimeToSendPacket,
-      void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
+      bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
   MOCK_METHOD1(TimeToSendPadding,
       int(int bytes));
 };
@@ -34,8 +34,9 @@
  public:
   PacedSenderPadding() : padding_sent_(0) {}
 
-  void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+  bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                         int64_t capture_time_ms) {
+    return true;
   }
 
   int TimeToSendPadding(int bytes) {
@@ -68,7 +69,9 @@
     EXPECT_FALSE(send_bucket_->SendPacket(priority, ssrc,
         sequence_number, capture_time_ms, size));
     EXPECT_CALL(callback_, TimeToSendPacket(
-        ssrc, sequence_number, capture_time_ms)).Times(1);
+        ssrc, sequence_number, capture_time_ms))
+        .Times(1)
+        .WillRepeatedly(Return(true));
   }
 
   MockPacedSenderCallback callback_;
@@ -98,7 +101,9 @@
   TickTime::AdvanceFakeClock(1);
   EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
   EXPECT_CALL(callback_, TimeToSendPacket(
-      ssrc, sequence_number++, capture_time_ms)).Times(1);
+      ssrc, sequence_number++, capture_time_ms))
+      .Times(1)
+      .WillRepeatedly(Return(true));
   send_bucket_->Process();
   sequence_number++;
   SendAndExpectPacket(PacedSender::kNormalPriority, ssrc, sequence_number++,
@@ -130,7 +135,9 @@
     EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
     TickTime::AdvanceFakeClock(5);
     EXPECT_CALL(callback_,
-        TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
+        TimeToSendPacket(ssrc, _, capture_time_ms))
+        .Times(3)
+        .WillRepeatedly(Return(true));
     EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
     EXPECT_EQ(0, send_bucket_->Process());
   }
@@ -177,7 +184,9 @@
 
     for (int i = 0; i < 3; ++i) {
       EXPECT_CALL(callback_, TimeToSendPacket(ssrc, queued_sequence_number++,
-                                              capture_time_ms)).Times(1);
+                                              capture_time_ms))
+          .Times(1)
+          .WillRepeatedly(Return(true));
    }
     EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
     EXPECT_EQ(0, send_bucket_->Process());
@@ -317,7 +326,9 @@
 
   // Expect all high and normal priority to be sent out first.
   EXPECT_CALL(callback_, TimeToSendPadding(_)).Times(0);
-  EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms)).Times(3);
+  EXPECT_CALL(callback_, TimeToSendPacket(ssrc, _, capture_time_ms))
+      .Times(3)
+      .WillRepeatedly(Return(true));
 
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
   TickTime::AdvanceFakeClock(5);
@@ -325,7 +336,9 @@
   EXPECT_EQ(0, send_bucket_->Process());
 
   EXPECT_CALL(callback_, TimeToSendPacket(
-      ssrc_low_priority, _, capture_time_ms_low_priority)).Times(1);
+      ssrc_low_priority, _, capture_time_ms_low_priority))
+      .Times(1)
+      .WillRepeatedly(Return(true));
 
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
   TickTime::AdvanceFakeClock(5);
@@ -378,8 +391,9 @@
   }
   // Expect high prio packets to come out first followed by all packets in the
   // way they were added.
-  EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms)).Times(3);
-
+  EXPECT_CALL(callback_, TimeToSendPacket(_, _, capture_time_ms))
+      .Times(3)
+      .WillRepeatedly(Return(true));
   send_bucket_->Resume();
 
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
@@ -387,9 +401,9 @@
   EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
   EXPECT_EQ(0, send_bucket_->Process());
 
-  EXPECT_CALL(callback_,
-              TimeToSendPacket(_, _, second_capture_time_ms)).Times(1);
-
+  EXPECT_CALL(callback_, TimeToSendPacket(_, _, second_capture_time_ms))
+      .Times(1)
+      .WillRepeatedly(Return(true));
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
   TickTime::AdvanceFakeClock(5);
   EXPECT_EQ(0, send_bucket_->TimeUntilNextProcess());
@@ -397,5 +411,62 @@
   EXPECT_EQ(0, send_bucket_->QueueInMs());
 }
 
+TEST_F(PacedSenderTest, ResendPacket) {
+  uint32_t ssrc = 12346;
+  uint16_t sequence_number = 1234;
+  int64_t capture_time_ms = TickTime::MillisecondTimestamp();
+  EXPECT_EQ(0, send_bucket_->QueueInMs());
+
+  EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
+                                        ssrc,
+                                        sequence_number,
+                                        capture_time_ms,
+                                        250));
+  EXPECT_FALSE(send_bucket_->SendPacket(PacedSender::kNormalPriority,
+                                        ssrc,
+                                        sequence_number + 1,
+                                        capture_time_ms + 1,
+                                        250));
+  TickTime::AdvanceFakeClock(10000);
+  EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
+            send_bucket_->QueueInMs());
+  // Fails to send first packet so only one call.
+  EXPECT_CALL(callback_, TimeToSendPacket(
+      ssrc, sequence_number, capture_time_ms))
+      .Times(1)
+      .WillOnce(Return(false));
+  TickTime::AdvanceFakeClock(10000);
+  send_bucket_->Process();
+
+  // Queue remains unchanged.
+  EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms,
+            send_bucket_->QueueInMs());
+
+  // Fails to send second packet.
+  EXPECT_CALL(callback_, TimeToSendPacket(
+      ssrc, sequence_number, capture_time_ms))
+      .Times(1)
+      .WillOnce(Return(true));
+  EXPECT_CALL(callback_, TimeToSendPacket(
+      ssrc, sequence_number + 1, capture_time_ms + 1))
+      .Times(1)
+      .WillOnce(Return(false));
+  TickTime::AdvanceFakeClock(10000);
+  send_bucket_->Process();
+
+  // Queue is reduced by 1 packet.
+  EXPECT_EQ(TickTime::MillisecondTimestamp() - capture_time_ms - 1,
+            send_bucket_->QueueInMs());
+
+  // Send second packet and queue becomes empty.
+  EXPECT_CALL(callback_, TimeToSendPacket(
+      ssrc, sequence_number + 1, capture_time_ms + 1))
+      .Times(1)
+      .WillOnce(Return(true));
+  TickTime::AdvanceFakeClock(10000);
+  send_bucket_->Process();
+  EXPECT_EQ(0, send_bucket_->QueueInMs());
+}
+
 }  // namespace test
 }  // namespace webrtc
diff --git a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
index 2110429..5d11448 100644
--- a/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
+++ b/webrtc/modules/rtp_rtcp/interface/rtp_rtcp.h
@@ -486,7 +486,7 @@
         const RTPFragmentationHeader* fragmentation = NULL,
         const RTPVideoHeader* rtpVideoHdr = NULL) = 0;
 
-    virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+    virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                                   int64_t capture_time_ms) = 0;
 
     virtual int TimeToSendPadding(int bytes) = 0;
diff --git a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 34d3ca4..0d39f8b 100644
--- a/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/webrtc/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -156,7 +156,7 @@
               const RTPFragmentationHeader* fragmentation,
               const RTPVideoHeader* rtpVideoHdr));
   MOCK_METHOD3(TimeToSendPacket,
-      void(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
+      bool(uint32_t ssrc, uint16_t sequence_number, int64_t capture_time_ms));
   MOCK_METHOD1(TimeToSendPadding,
       int(int bytes));
   MOCK_METHOD3(RegisterRtcpObservers,
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
index c5defe8..99cadf2 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
+++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
@@ -967,7 +967,7 @@
   return ret_val;
 }
 
-void ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc,
+bool ModuleRtpRtcpImpl::TimeToSendPacket(uint32_t ssrc,
                                          uint16_t sequence_number,
                                          int64_t capture_time_ms) {
   WEBRTC_TRACE(
@@ -985,19 +985,21 @@
   if (no_child_modules) {
     // Don't send from default module.
     if (SendingMedia() && ssrc == rtp_sender_.SSRC()) {
-      rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
+      return rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
     }
   } else {
     CriticalSectionScoped lock(critical_section_module_ptrs_.get());
     std::list<ModuleRtpRtcpImpl*>::iterator it = child_modules_.begin();
     while (it != child_modules_.end()) {
       if ((*it)->SendingMedia() && ssrc == (*it)->rtp_sender_.SSRC()) {
-        (*it)->rtp_sender_.TimeToSendPacket(sequence_number, capture_time_ms);
-        return;
+        return (*it)->rtp_sender_.TimeToSendPacket(sequence_number,
+                                                   capture_time_ms);
       }
       ++it;
     }
   }
+  // No RTP sender is interested in sending this packet.
+  return true;
 }
 
 int ModuleRtpRtcpImpl::TimeToSendPadding(int bytes) {
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index 891d5b8..c70f7cb 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/webrtc/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -188,7 +188,7 @@
       const RTPFragmentationHeader* fragmentation = NULL,
       const RTPVideoHeader* rtp_video_hdr = NULL);
 
-  virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+  virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                                 int64_t capture_time_ms);
   // Returns the number of padding bytes actually sent, which can be more or
   // less than |bytes|.
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
index 07a3f1b..744bbe2 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
+++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.cc
@@ -701,7 +701,7 @@
 }
 
 // Called from pacer when we can send the packet.
-void RTPSender::TimeToSendPacket(uint16_t sequence_number,
+bool RTPSender::TimeToSendPacket(uint16_t sequence_number,
                                  int64_t capture_time_ms) {
   StorageType type;
   uint16_t length = IP_PACKET_SIZE;
@@ -709,11 +709,13 @@
   int64_t stored_time_ms;
 
   if (packet_history_ == NULL) {
-    return;
+    // Packet cannot be found. Allow sending to continue.
+    return true;
   }
   if (!packet_history_->GetRTPPacket(sequence_number, 0, data_buffer, &length,
                                      &stored_time_ms, &type)) {
-    return;
+    // Packet cannot be found. Allow sending to continue.
+    return true;
   }
   assert(length > 0);
 
@@ -736,7 +738,7 @@
                                       rtp_header.sequenceNumber,
                                       rtp_header.headerLength);
   }
-  SendPacketToNetwork(data_buffer, length);
+  return SendPacketToNetwork(data_buffer, length);
 }
 
 int RTPSender::TimeToSendPadding(int bytes) {
diff --git a/webrtc/modules/rtp_rtcp/source/rtp_sender.h b/webrtc/modules/rtp_rtcp/source/rtp_sender.h
index 1efba85..61dc1c5 100644
--- a/webrtc/modules/rtp_rtcp/source/rtp_sender.h
+++ b/webrtc/modules/rtp_rtcp/source/rtp_sender.h
@@ -165,7 +165,7 @@
                               const RTPHeader &rtp_header,
                               const int64_t now_ms) const;
 
-  void TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms);
+  bool TimeToSendPacket(uint16_t sequence_number, int64_t capture_time_ms);
   int TimeToSendPadding(int bytes);
 
   // NACK.
diff --git a/webrtc/video_engine/vie_encoder.cc b/webrtc/video_engine/vie_encoder.cc
index 3bc4b06..be079a4 100644
--- a/webrtc/video_engine/vie_encoder.cc
+++ b/webrtc/video_engine/vie_encoder.cc
@@ -88,9 +88,9 @@
   explicit ViEPacedSenderCallback(ViEEncoder* owner)
       : owner_(owner) {
   }
-  virtual void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+  virtual bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                                 int64_t capture_time_ms) {
-    owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
+    return owner_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
   }
   virtual int TimeToSendPadding(int bytes) {
     return owner_->TimeToSendPadding(bytes);
@@ -482,9 +482,10 @@
   return 0;
 }
 
-void ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+bool ViEEncoder::TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                                   int64_t capture_time_ms) {
-  default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number, capture_time_ms);
+  return default_rtp_rtcp_->TimeToSendPacket(ssrc, sequence_number,
+                                             capture_time_ms);
 }
 
 int ViEEncoder::TimeToSendPadding(int bytes) {
diff --git a/webrtc/video_engine/vie_encoder.h b/webrtc/video_engine/vie_encoder.h
index 1209b31..4c4cd18 100644
--- a/webrtc/video_engine/vie_encoder.h
+++ b/webrtc/video_engine/vie_encoder.h
@@ -173,7 +173,7 @@
                         const uint32_t round_trip_time_ms);
 
   // Called by PacedSender.
-  void TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
+  bool TimeToSendPacket(uint32_t ssrc, uint16_t sequence_number,
                         int64_t capture_time_ms);
   int TimeToSendPadding(int bytes);