Reland "Running FrameBuffer on task queue."

This is a reland of 13943b7b7f6d00568912b9969db2c7871d18e21f

Original change's description:
> Running FrameBuffer on task queue.
> 
> This prepares for running WebRTC in simulated time where event::Wait
> based timing doesn't work.
> 
> Bug: webrtc:10365
> Change-Id: Ia0f9b1cc8e3c8c27a38e45b40487050a4699d8cf
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/129962
> Reviewed-by: Philip Eliasson <philipel@webrtc.org>
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#27422}

Bug: webrtc:10365
Change-Id: I412d3e0fe06c6dd57cdb42974f09e03f3a6ad038
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131124
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27572}
diff --git a/modules/video_coding/frame_buffer2.cc b/modules/video_coding/frame_buffer2.cc
index 5d427b0..20b680e 100644
--- a/modules/video_coding/frame_buffer2.cc
+++ b/modules/video_coding/frame_buffer2.cc
@@ -17,6 +17,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/memory/memory.h"
 #include "api/video/encoded_image.h"
 #include "api/video/video_timing.h"
 #include "modules/video_coding/include/video_coding_defines.h"
@@ -53,6 +54,7 @@
                          VCMReceiveStatisticsCallback* stats_callback)
     : decoded_frames_history_(kMaxFramesHistory),
       clock_(clock),
+      callback_queue_(nullptr),
       jitter_estimator_(jitter_estimator),
       timing_(timing),
       inter_frame_delay_(clock_->TimeInMilliseconds()),
@@ -65,6 +67,55 @@
 
 FrameBuffer::~FrameBuffer() {}
 
+void FrameBuffer::NextFrame(
+    int64_t max_wait_time_ms,
+    bool keyframe_required,
+    rtc::TaskQueue* callback_queue,
+    std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler) {
+  RTC_DCHECK_RUN_ON(callback_queue);
+  TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame");
+  int64_t latest_return_time_ms =
+      clock_->TimeInMilliseconds() + max_wait_time_ms;
+  rtc::CritScope lock(&crit_);
+  if (stopped_) {
+    return;
+  }
+  latest_return_time_ms_ = latest_return_time_ms;
+  keyframe_required_ = keyframe_required;
+  frame_handler_ = handler;
+  callback_queue_ = callback_queue;
+  StartWaitForNextFrameOnQueue();
+}
+
+void FrameBuffer::StartWaitForNextFrameOnQueue() {
+  RTC_DCHECK(callback_queue_);
+  RTC_DCHECK(!callback_task_.Running());
+  int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
+  callback_task_ = RepeatingTaskHandle::DelayedStart(
+      callback_queue_->Get(), TimeDelta::ms(wait_ms), [this] {
+        // If this task has not been cancelled, we did not get any new frames
+        // while waiting. Continue with frame delivery.
+        rtc::CritScope lock(&crit_);
+        if (!frames_to_decode_.empty()) {
+          // We have frames, deliver!
+          frame_handler_(absl::WrapUnique(GetNextFrame()), kFrameFound);
+          CancelCallback();
+          return TimeDelta::Zero();  // Ignored.
+        } else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) {
+          // We have timed out, signal this and stop repeating.
+          frame_handler_(nullptr, kTimeout);
+          CancelCallback();
+          return TimeDelta::Zero();  // Ignored.
+        } else {
+          // If there's no frames to decode and there is still time left, it
+          // means that the frame buffer was cleared between creation and
+          // execution of this task. Continue waiting for the remaining time.
+          int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
+          return TimeDelta::ms(wait_ms);
+        }
+      });
+}
+
 FrameBuffer::ReturnReason FrameBuffer::NextFrame(
     int64_t max_wait_time_ms,
     std::unique_ptr<EncodedFrame>* frame_out,
@@ -313,6 +364,7 @@
   rtc::CritScope lock(&crit_);
   stopped_ = true;
   new_continuous_frame_event_.Set();
+  CancelCallback();
 }
 
 void FrameBuffer::Clear() {
@@ -342,6 +394,12 @@
   return true;
 }
 
+void FrameBuffer::CancelCallback() {
+  frame_handler_ = {};
+  callback_task_.Stop();
+  callback_queue_ = nullptr;
+}
+
 bool FrameBuffer::IsCompleteSuperFrame(const EncodedFrame& frame) {
   if (frame.inter_layer_predicted) {
     // Check that all previous spatial layers are already inserted.
@@ -487,9 +545,19 @@
     last_continuous_picture_id = last_continuous_frame_->picture_id;
 
     // Since we now have new continuous frames there might be a better frame
-    // to return from NextFrame. Signal that thread so that it again can choose
-    // which frame to return.
+    // to return from NextFrame.
     new_continuous_frame_event_.Set();
+
+    if (callback_queue_) {
+      callback_queue_->PostTask([this] {
+        rtc::CritScope lock(&crit_);
+        if (!callback_task_.Running())
+          return;
+        RTC_CHECK(frame_handler_);
+        callback_task_.Stop();
+        StartWaitForNextFrameOnQueue();
+      });
+    }
   }
 
   return last_continuous_picture_id;