Implement an OperationsChain, to be used by PeerConnection in follow-up.

This allows asynchronous tasks to be queued to be executed in order.
The class is motivated by the "operations chain" in the spec:
https://w3c.github.io/webrtc-pc/#dfn-operations-chain

In a follow-up CL I intend to use this in PeerConnection's
CreateOffer(), CreateAnswer() SetLocalDescription() and
SetRemoteDescription() and unblock https://crbug.com/980885.

For background, motivation, requirements and implementation notes, see
https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing

Bug: webrtc:11019
Change-Id: I982e4a1c0e77fa62096c16deed459d9d9e9b63f0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/156120
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29582}
diff --git a/rtc_base/operations_chain_unittest.cc b/rtc_base/operations_chain_unittest.cc
new file mode 100644
index 0000000..8dbe607
--- /dev/null
+++ b/rtc_base/operations_chain_unittest.cc
@@ -0,0 +1,337 @@
+/*
+ *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/operations_chain.h"
+
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/bind.h"
+#include "rtc_base/event.h"
+#include "rtc_base/thread.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+namespace rtc {
+
+using ::testing::ElementsAre;
+
+class OperationTracker {
+ public:
+  OperationTracker() : background_thread_(Thread::Create()) {
+    background_thread_->Start();
+  }
+  // The caller is responsible for ensuring that no operations are pending.
+  ~OperationTracker() {}
+
+  // Creates a binding for the synchronous operation (see
+  // StartSynchronousOperation() below).
+  std::function<void(std::function<void()>)> BindSynchronousOperation(
+      Event* operation_complete_event) {
+    return [this, operation_complete_event](std::function<void()> callback) {
+      StartSynchronousOperation(operation_complete_event, std::move(callback));
+    };
+  }
+
+  // Creates a binding for the asynchronous operation (see
+  // StartAsynchronousOperation() below).
+  std::function<void(std::function<void()>)> BindAsynchronousOperation(
+      Event* unblock_operation_event,
+      Event* operation_complete_event) {
+    return [this, unblock_operation_event,
+            operation_complete_event](std::function<void()> callback) {
+      StartAsynchronousOperation(unblock_operation_event,
+                                 operation_complete_event, std::move(callback));
+    };
+  }
+
+  // When an operation is completed, its associated Event* is added to this
+  // list, in chronological order. This allows you to verify the order that
+  // operations are executed.
+  const std::vector<Event*>& completed_operation_events() const {
+    return completed_operation_events_;
+  }
+
+ private:
+  // This operation is completed synchronously; the callback is invoked before
+  // the function returns.
+  void StartSynchronousOperation(Event* operation_complete_event,
+                                 std::function<void()> callback) {
+    completed_operation_events_.push_back(operation_complete_event);
+    operation_complete_event->Set();
+    callback();
+  }
+
+  // This operation is completed asynchronously; it pings |background_thread_|,
+  // blocking that thread until |unblock_operation_event| is signaled and then
+  // completes upon posting back to the thread that the operation started on.
+  // Note that this requires the starting thread to be executing tasks (handle
+  // messages), i.e. must not be blocked.
+  void StartAsynchronousOperation(Event* unblock_operation_event,
+                                  Event* operation_complete_event,
+                                  std::function<void()> callback) {
+    Thread* current_thread = Thread::Current();
+    background_thread_->PostTask(
+        RTC_FROM_HERE, [this, current_thread, unblock_operation_event,
+                        operation_complete_event, callback]() {
+          unblock_operation_event->Wait(Event::kForever);
+          current_thread->PostTask(
+              RTC_FROM_HERE, [this, operation_complete_event, callback]() {
+                completed_operation_events_.push_back(operation_complete_event);
+                operation_complete_event->Set();
+                callback();
+              });
+        });
+  }
+
+  std::unique_ptr<Thread> background_thread_;
+  std::vector<Event*> completed_operation_events_;
+};
+
+// The OperationTrackerProxy ensures all operations are chained on a separate
+// thread. This allows tests to block while chained operations are posting
+// between threads.
+class OperationTrackerProxy {
+ public:
+  OperationTrackerProxy()
+      : operations_chain_thread_(Thread::Create()),
+        operation_tracker_(nullptr),
+        operations_chain_(nullptr) {
+    operations_chain_thread_->Start();
+  }
+
+  std::unique_ptr<Event> Initialize() {
+    std::unique_ptr<Event> event = std::make_unique<Event>();
+    operations_chain_thread_->PostTask(
+        RTC_FROM_HERE, [this, event_ptr = event.get()]() {
+          operation_tracker_ = std::make_unique<OperationTracker>();
+          operations_chain_ = OperationsChain::Create();
+          event_ptr->Set();
+        });
+    return event;
+  }
+
+  std::unique_ptr<Event> ReleaseOperationChain() {
+    std::unique_ptr<Event> event = std::make_unique<Event>();
+    operations_chain_thread_->PostTask(RTC_FROM_HERE,
+                                       [this, event_ptr = event.get()]() {
+                                         operations_chain_ = nullptr;
+                                         event_ptr->Set();
+                                       });
+    return event;
+  }
+
+  // Chains a synchronous operation on the operation chain's thread.
+  std::unique_ptr<Event> PostSynchronousOperation() {
+    std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
+    operations_chain_thread_->PostTask(
+        RTC_FROM_HERE, [this, operation_complete_event_ptr =
+                                  operation_complete_event.get()]() {
+          operations_chain_->ChainOperation(
+              operation_tracker_->BindSynchronousOperation(
+                  operation_complete_event_ptr));
+        });
+    return operation_complete_event;
+  }
+
+  // Chains an asynchronous operation on the operation chain's thread. This
+  // involves the operation chain thread and an additional background thread.
+  std::unique_ptr<Event> PostAsynchronousOperation(
+      Event* unblock_operation_event) {
+    std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
+    operations_chain_thread_->PostTask(
+        RTC_FROM_HERE,
+        [this, unblock_operation_event,
+         operation_complete_event_ptr = operation_complete_event.get()]() {
+          operations_chain_->ChainOperation(
+              operation_tracker_->BindAsynchronousOperation(
+                  unblock_operation_event, operation_complete_event_ptr));
+        });
+    return operation_complete_event;
+  }
+
+  // The order of completed events. Touches the |operation_tracker_| on the
+  // calling thread, this is only thread safe if all chained operations have
+  // completed.
+  const std::vector<Event*>& completed_operation_events() const {
+    return operation_tracker_->completed_operation_events();
+  }
+
+ private:
+  std::unique_ptr<Thread> operations_chain_thread_;
+  std::unique_ptr<OperationTracker> operation_tracker_;
+  scoped_refptr<OperationsChain> operations_chain_;
+};
+
+TEST(OperationsChainTest, SynchronousOperation) {
+  OperationTrackerProxy operation_tracker_proxy;
+  operation_tracker_proxy.Initialize()->Wait(Event::kForever);
+
+  operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
+}
+
+TEST(OperationsChainTest, AsynchronousOperation) {
+  OperationTrackerProxy operation_tracker_proxy;
+  operation_tracker_proxy.Initialize()->Wait(Event::kForever);
+
+  Event unblock_async_operation_event;
+  auto async_operation_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &unblock_async_operation_event);
+  // This should not be signaled until we unblock the operation.
+  EXPECT_FALSE(async_operation_completed_event->Wait(0));
+  // Unblock the operation and wait for it to complete.
+  unblock_async_operation_event.Set();
+  async_operation_completed_event->Wait(Event::kForever);
+}
+
+TEST(OperationsChainTest,
+     SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
+  // Testing synchonicity must be done without the OperationTrackerProxy to
+  // ensure messages are not processed in parallel. This test has no background
+  // threads.
+  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
+  OperationTracker operation_tracker;
+  Event event0;
+  operations_chain->ChainOperation(
+      operation_tracker.BindSynchronousOperation(&event0));
+  // This should already be signaled. (If it wasn't, waiting wouldn't help,
+  // because we'd be blocking the only thread that exists.)
+  EXPECT_TRUE(event0.Wait(0));
+  // Chaining another operation should also execute immediately because the
+  // chain should already be empty.
+  Event event1;
+  operations_chain->ChainOperation(
+      operation_tracker.BindSynchronousOperation(&event1));
+  EXPECT_TRUE(event1.Wait(0));
+}
+
+TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
+  OperationTrackerProxy operation_tracker_proxy;
+  operation_tracker_proxy.Initialize()->Wait(Event::kForever);
+
+  Event unblock_async_operation_event;
+  auto async_operation_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &unblock_async_operation_event);
+
+  auto sync_operation_completed_event =
+      operation_tracker_proxy.PostSynchronousOperation();
+
+  unblock_async_operation_event.Set();
+
+  sync_operation_completed_event->Wait(Event::kForever);
+  // The asynchronous avent should have blocked the synchronous event, meaning
+  // this should already be signaled.
+  EXPECT_TRUE(async_operation_completed_event->Wait(0));
+}
+
+TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
+  OperationTrackerProxy operation_tracker_proxy;
+  operation_tracker_proxy.Initialize()->Wait(Event::kForever);
+
+  // Chain a mix of asynchronous and synchronous operations.
+  Event operation0_unblock_event;
+  auto operation0_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &operation0_unblock_event);
+
+  Event operation1_unblock_event;
+  auto operation1_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &operation1_unblock_event);
+
+  auto operation2_completed_event =
+      operation_tracker_proxy.PostSynchronousOperation();
+
+  auto operation3_completed_event =
+      operation_tracker_proxy.PostSynchronousOperation();
+
+  Event operation4_unblock_event;
+  auto operation4_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &operation4_unblock_event);
+
+  auto operation5_completed_event =
+      operation_tracker_proxy.PostSynchronousOperation();
+
+  Event operation6_unblock_event;
+  auto operation6_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &operation6_unblock_event);
+
+  // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
+  // don't need to be unblocked.
+  operation6_unblock_event.Set();
+  operation4_unblock_event.Set();
+  operation1_unblock_event.Set();
+  operation0_unblock_event.Set();
+  // Await all operations. The await-order shouldn't matter since they all get
+  // executed eventually.
+  operation0_completed_event->Wait(Event::kForever);
+  operation1_completed_event->Wait(Event::kForever);
+  operation2_completed_event->Wait(Event::kForever);
+  operation3_completed_event->Wait(Event::kForever);
+  operation4_completed_event->Wait(Event::kForever);
+  operation5_completed_event->Wait(Event::kForever);
+  operation6_completed_event->Wait(Event::kForever);
+
+  EXPECT_THAT(
+      operation_tracker_proxy.completed_operation_events(),
+      ElementsAre(
+          operation0_completed_event.get(), operation1_completed_event.get(),
+          operation2_completed_event.get(), operation3_completed_event.get(),
+          operation4_completed_event.get(), operation5_completed_event.get(),
+          operation6_completed_event.get()));
+}
+
+TEST(OperationsChainTest,
+     SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
+  OperationTrackerProxy operation_tracker_proxy;
+  operation_tracker_proxy.Initialize()->Wait(Event::kForever);
+
+  Event unblock_async_operation_event;
+  auto async_operation_completed_event =
+      operation_tracker_proxy.PostAsynchronousOperation(
+          &unblock_async_operation_event);
+
+  // Pending operations keep the OperationChain alive, making it safe for the
+  // test to release any references before unblocking the async operation.
+  operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
+
+  unblock_async_operation_event.Set();
+  async_operation_completed_event->Wait(Event::kForever);
+}
+
+#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
+
+TEST(OperationsChainTest, OperationNotInvokingCallbackShouldCrash) {
+  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
+  EXPECT_DEATH(
+      operations_chain->ChainOperation([](std::function<void()> callback) {}),
+      "");
+}
+
+TEST(OperationsChainTest, OperationInvokingCallbackMultipleTimesShouldCrash) {
+  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
+  EXPECT_DEATH(
+      operations_chain->ChainOperation([](std::function<void()> callback) {
+        // Signal that the operation has completed multiple times.
+        callback();
+        callback();
+      }),
+      "");
+}
+
+#endif  // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
+
+}  // namespace rtc