network_tester: Remove usage of rtc::Thread::socketserver() and cleanup

Instead of creating a TaskQueue from packet_sender, create a rtc::Thread
in test_controller so that test_controller instantiates a SocketServer,
eliminating the use of rtc::Thread::socketserver().
Also did various cleanups, such as adding threading annotations, and
ensuring that all network operations are done in dedicated threads.

Bug: webrtc:13145
Test: Unittest, and manually verified using Android clients and Linux servers
Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321
Commit-Queue: Daniel.l Lee <daniel.l@hpcnt.com>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37411}
diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn
index 4fef840..bb1f5d9 100644
--- a/rtc_tools/network_tester/BUILD.gn
+++ b/rtc_tools/network_tester/BUILD.gn
@@ -47,6 +47,7 @@
       "../../rtc_base:checks",
       "../../rtc_base:ignore_wundef",
       "../../rtc_base:ip_address",
+      "../../rtc_base:logging",
       "../../rtc_base:macromagic",
       "../../rtc_base:protobuf_utils",
       "../../rtc_base:rtc_task_queue",
@@ -81,6 +82,7 @@
     deps = [
       ":network_tester",
       "../../rtc_base:gunit_helpers",
+      "../../rtc_base:threading",
       "../../test:fileutils",
       "../../test:test_support",
       "//testing/gtest",
@@ -98,7 +100,11 @@
   rtc_executable("network_tester_server") {
     sources = [ "server.cc" ]
 
-    deps = [ ":network_tester" ]
+    deps = [
+      ":network_tester",
+      "../../rtc_base:null_socket_server",
+      "../../rtc_base:threading",
+    ]
   }
 }
 
@@ -158,9 +164,13 @@
   }
 
   rtc_shared_library("network_tester_so") {
-    sources = [ "jni.cpp" ]
+    sources = [ "jni.cc" ]
 
-    deps = [ ":network_tester" ]
+    deps = [
+      ":network_tester",
+      "../../rtc_base:logging",
+      "../../rtc_base:threading",
+    ]
 
     suppressed_configs += [ "//build/config/android:hide_all_but_jni_onload" ]
     configs += [ "//build/config/android:hide_all_but_jni" ]
diff --git a/rtc_tools/network_tester/config_reader.cc b/rtc_tools/network_tester/config_reader.cc
index ed76a3f..16ae458 100644
--- a/rtc_tools/network_tester/config_reader.cc
+++ b/rtc_tools/network_tester/config_reader.cc
@@ -21,7 +21,8 @@
     : proto_config_index_(0) {
   std::ifstream config_stream(config_file_path,
                               std::ios_base::in | std::ios_base::binary);
-  RTC_DCHECK(config_stream.is_open());
+  RTC_DCHECK(config_stream.is_open())
+      << "Config " << config_file_path << " open failed";
   RTC_DCHECK(config_stream.good());
   std::string config_data((std::istreambuf_iterator<char>(config_stream)),
                           (std::istreambuf_iterator<char>()));
diff --git a/rtc_tools/network_tester/jni.cpp b/rtc_tools/network_tester/jni.cc
similarity index 87%
rename from rtc_tools/network_tester/jni.cpp
rename to rtc_tools/network_tester/jni.cc
index 818dd7c..f192739 100644
--- a/rtc_tools/network_tester/jni.cpp
+++ b/rtc_tools/network_tester/jni.cc
@@ -13,12 +13,15 @@
 #define JNIEXPORT __attribute__((visibility("default")))
 #include <string>
 
+#include "rtc_base/logging.h"
+#include "rtc_base/thread.h"
 #include "rtc_tools/network_tester/test_controller.h"
 
 extern "C" JNIEXPORT jlong JNICALL
 Java_com_google_media_networktester_NetworkTester_CreateTestController(
     JNIEnv* jni,
     jclass) {
+  rtc::ThreadManager::Instance()->WrapCurrentThread();
   return reinterpret_cast<intptr_t>(new webrtc::TestController(
       0, 0, "/mnt/sdcard/network_tester_client_config.dat",
       "/mnt/sdcard/network_tester_client_packet_log.dat"));
@@ -47,7 +50,8 @@
     JNIEnv* jni,
     jclass,
     jlong native_pointer) {
-  reinterpret_cast<webrtc::TestController*>(native_pointer)->Run();
+  // 100 ms arbitrary chosen, but it works well.
+  rtc::Thread::Current()->ProcessMessages(/*cms=*/100);
 }
 
 extern "C" JNIEXPORT void JNICALL
@@ -60,4 +64,5 @@
   if (test_controller) {
     delete test_controller;
   }
+  rtc::ThreadManager::Instance()->UnwrapCurrentThread();
 }
diff --git a/rtc_tools/network_tester/network_tester_unittest.cc b/rtc_tools/network_tester/network_tester_unittest.cc
index 156e8bb..60b34e4 100644
--- a/rtc_tools/network_tester/network_tester_unittest.cc
+++ b/rtc_tools/network_tester/network_tester_unittest.cc
@@ -20,6 +20,7 @@
 namespace webrtc {
 
 TEST(NetworkTesterTest, ServerClient) {
+  rtc::AutoThread main_thread;
   TestController client(
       0, 0, webrtc::test::ResourcePath("network_tester/client_config", "dat"),
       webrtc::test::OutputPath() + "client_packet_log.dat");
diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc
index b2c6cd9..b80bb98 100644
--- a/rtc_tools/network_tester/packet_sender.cc
+++ b/rtc_tools/network_tester/packet_sender.cc
@@ -15,8 +15,6 @@
 #include <string>
 #include <utility>
 
-#include "absl/types/optional.h"
-#include "api/task_queue/default_task_queue_factory.h"
 #include "api/task_queue/queued_task.h"
 #include "api/task_queue/task_queue_base.h"
 #include "rtc_base/time_utils.h"
@@ -29,12 +27,16 @@
 
 class SendPacketTask : public QueuedTask {
  public:
-  explicit SendPacketTask(PacketSender* packet_sender)
-      : target_time_ms_(rtc::TimeMillis()), packet_sender_(packet_sender) {}
+  explicit SendPacketTask(
+      PacketSender* packet_sender,
+      rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
+      : target_time_ms_(rtc::TimeMillis()),
+        packet_sender_(packet_sender),
+        task_safety_flag_(task_safety_flag) {}
 
  private:
   bool Run() override {
-    if (packet_sender_->IsSending()) {
+    if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
       packet_sender_->SendPacket();
       target_time_ms_ += packet_sender_->GetSendIntervalMs();
       int64_t delay_ms = std::max(static_cast<int64_t>(0),
@@ -48,17 +50,24 @@
   }
   int64_t target_time_ms_;
   PacketSender* const packet_sender_;
+  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
 };
 
 class UpdateTestSettingTask : public QueuedTask {
  public:
-  UpdateTestSettingTask(PacketSender* packet_sender,
-                        std::unique_ptr<ConfigReader> config_reader)
+  UpdateTestSettingTask(
+      PacketSender* packet_sender,
+      std::unique_ptr<ConfigReader> config_reader,
+      rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
       : packet_sender_(packet_sender),
-        config_reader_(std::move(config_reader)) {}
+        config_reader_(std::move(config_reader)),
+        task_safety_flag_(task_safety_flag) {}
 
  private:
   bool Run() override {
+    if (!task_safety_flag_->alive()) {
+      return true;
+    }
     auto config = config_reader_->GetNextConfig();
     if (config) {
       packet_sender_->UpdateTestSetting((*config).packet_size,
@@ -73,34 +82,38 @@
   }
   PacketSender* const packet_sender_;
   const std::unique_ptr<ConfigReader> config_reader_;
+  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
 };
 
 }  // namespace
 
-PacketSender::PacketSender(TestController* test_controller,
-                           const std::string& config_file_path)
+PacketSender::PacketSender(
+    TestController* test_controller,
+    webrtc::TaskQueueBase* worker_queue,
+    rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+    const std::string& config_file_path)
     : packet_size_(0),
       send_interval_ms_(0),
       sequence_number_(0),
       sending_(false),
       config_file_path_(config_file_path),
       test_controller_(test_controller),
-      task_queue_factory_(CreateDefaultTaskQueueFactory()),
-      worker_queue_(task_queue_factory_->CreateTaskQueue(
-          "Packet Sender",
-          TaskQueueFactory::Priority::HIGH)) {}
+      worker_queue_(worker_queue),
+      task_safety_flag_(task_safety_flag) {}
 
 PacketSender::~PacketSender() = default;
 
 void PacketSender::StartSending() {
   worker_queue_checker_.Detach();
-  worker_queue_.PostTask([this]() {
+  worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
     RTC_DCHECK_RUN_ON(&worker_queue_checker_);
     sending_ = true;
-  });
-  worker_queue_.PostTask(std::make_unique<UpdateTestSettingTask>(
-      this, std::make_unique<ConfigReader>(config_file_path_)));
-  worker_queue_.PostTask(std::make_unique<SendPacketTask>(this));
+  }));
+  worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
+      this, std::make_unique<ConfigReader>(config_file_path_),
+      task_safety_flag_));
+  worker_queue_->PostTask(
+      std::make_unique<SendPacketTask>(this, task_safety_flag_));
 }
 
 void PacketSender::StopSending() {
diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h
index 233ed6a..323f75b 100644
--- a/rtc_tools/network_tester/packet_sender.h
+++ b/rtc_tools/network_tester/packet_sender.h
@@ -35,8 +35,11 @@
 
 class PacketSender {
  public:
-  PacketSender(TestController* test_controller,
-               const std::string& config_file_path);
+  PacketSender(
+      TestController* test_controller,
+      webrtc::TaskQueueBase* worker_queue,
+      rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+      const std::string& config_file_path);
   ~PacketSender();
 
   PacketSender(const PacketSender&) = delete;
@@ -59,8 +62,8 @@
   bool sending_ RTC_GUARDED_BY(worker_queue_checker_);
   const std::string config_file_path_;
   TestController* const test_controller_;
-  std::unique_ptr<TaskQueueFactory> task_queue_factory_;
-  rtc::TaskQueue worker_queue_;
+  webrtc::TaskQueueBase* worker_queue_;
+  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
 };
 
 }  // namespace webrtc
diff --git a/rtc_tools/network_tester/server.cc b/rtc_tools/network_tester/server.cc
index 4074a48..f0f610a 100644
--- a/rtc_tools/network_tester/server.cc
+++ b/rtc_tools/network_tester/server.cc
@@ -8,13 +8,16 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
+#include "rtc_base/null_socket_server.h"
 #include "rtc_tools/network_tester/test_controller.h"
 
 int main(int /*argn*/, char* /*argv*/[]) {
+  rtc::Thread main_thread(std::make_unique<rtc::NullSocketServer>());
   webrtc::TestController server(9090, 9090, "server_config.dat",
                                 "server_packet_log.dat");
   while (!server.IsTestDone()) {
-    server.Run();
+    // 100 ms is arbitrary chosen.
+    main_thread.ProcessMessages(/*cms=*/100);
   }
   return 0;
 }
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index d0e2f4c..6b16708 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -14,7 +14,9 @@
 
 #include "absl/types/optional.h"
 #include "rtc_base/checks.h"
+#include "rtc_base/internal/default_socket_server.h"
 #include "rtc_base/ip_address.h"
+#include "rtc_base/logging.h"
 #include "rtc_base/thread.h"
 
 namespace webrtc {
@@ -23,20 +25,32 @@
                                int max_port,
                                const std::string& config_file_path,
                                const std::string& log_file_path)
-    // TODO(bugs.webrtc.org/13145): Add a SocketFactory argument.
-    : socket_factory_(
-          rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()),
+    : socket_server_(rtc::CreateDefaultSocketServer()),
+      packet_sender_thread_(
+          std::make_unique<rtc::Thread>(socket_server_.get())),
+      socket_factory_(socket_server_.get()),
       config_file_path_(config_file_path),
       packet_logger_(log_file_path),
       local_test_done_(false),
-      remote_test_done_(false) {
+      remote_test_done_(false),
+      task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) {
   RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  packet_sender_checker_.Detach();
   send_data_.fill(42);
-  udp_socket_ =
-      std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
-          rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
-  udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+  packet_sender_thread_->SetName("PacketSender", nullptr);
+  packet_sender_thread_->Start();
+  packet_sender_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
+    RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+    udp_socket_ =
+        std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
+            rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
+    udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+  });
+}
+
+TestController::~TestController() {
+  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+  packet_sender_thread_->Invoke<void>(
+      RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); });
 }
 
 void TestController::SendConnectTo(const std::string& hostname, int port) {
@@ -45,18 +59,22 @@
   NetworkTesterPacket packet;
   packet.set_type(NetworkTesterPacket::HAND_SHAKING);
   SendData(packet, absl::nullopt);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   local_test_done_ = false;
   remote_test_done_ = false;
 }
 
-void TestController::Run() {
-  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  rtc::Thread::Current()->ProcessMessages(0);
-}
-
 void TestController::SendData(const NetworkTesterPacket& packet,
                               absl::optional<size_t> data_size) {
+  if (!packet_sender_thread_->IsCurrent()) {
+    packet_sender_thread_->PostTask(ToQueuedTask(
+        task_safety_flag_,
+        [this, packet, data_size]() { this->SendData(packet, data_size); }));
+    return;
+  }
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+  RTC_LOG(LS_VERBOSE) << "SendData";
+
   // Can be call from packet_sender or from test_controller thread.
   size_t packet_size = packet.ByteSizeLong();
   send_data_[0] = packet_size;
@@ -69,17 +87,17 @@
 }
 
 void TestController::OnTestDone() {
-  RTC_DCHECK_RUN_ON(&packet_sender_checker_);
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
   NetworkTesterPacket packet;
   packet.set_type(NetworkTesterPacket::TEST_DONE);
   SendData(packet, absl::nullopt);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   local_test_done_ = true;
 }
 
 bool TestController::IsTestDone() {
   RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   return local_test_done_ && remote_test_done_;
 }
 
@@ -88,7 +106,8 @@
                                   size_t len,
                                   const rtc::SocketAddress& remote_addr,
                                   const int64_t& packet_time_us) {
-  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+  RTC_LOG(LS_VERBOSE) << "OnReadPacket";
   size_t packet_size = data[0];
   std::string receive_data(&data[1], packet_size);
   NetworkTesterPacket packet;
@@ -100,17 +119,21 @@
       packet.set_type(NetworkTesterPacket::TEST_START);
       remote_address_ = remote_addr;
       SendData(packet, absl::nullopt);
-      packet_sender_.reset(new PacketSender(this, config_file_path_));
+      packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+                                            task_safety_flag_,
+                                            config_file_path_));
       packet_sender_->StartSending();
-      MutexLock scoped_lock(&local_test_done_lock_);
+      MutexLock scoped_lock(&test_done_lock_);
       local_test_done_ = false;
       remote_test_done_ = false;
       break;
     }
     case NetworkTesterPacket::TEST_START: {
-      packet_sender_.reset(new PacketSender(this, config_file_path_));
+      packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+                                            task_safety_flag_,
+                                            config_file_path_));
       packet_sender_->StartSending();
-      MutexLock scoped_lock(&local_test_done_lock_);
+      MutexLock scoped_lock(&test_done_lock_);
       local_test_done_ = false;
       remote_test_done_ = false;
       break;
@@ -122,6 +145,7 @@
       break;
     }
     case NetworkTesterPacket::TEST_DONE: {
+      MutexLock scoped_lock(&test_done_lock_);
       remote_test_done_ = true;
       break;
     }
diff --git a/rtc_tools/network_tester/test_controller.h b/rtc_tools/network_tester/test_controller.h
index 3933b46..b08fbd5 100644
--- a/rtc_tools/network_tester/test_controller.h
+++ b/rtc_tools/network_tester/test_controller.h
@@ -25,6 +25,7 @@
 #include "rtc_base/ignore_wundef.h"
 #include "rtc_base/socket_address.h"
 #include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
 #include "rtc_base/thread_annotations.h"
 #include "rtc_tools/network_tester/packet_logger.h"
@@ -49,12 +50,11 @@
                  int max_port,
                  const std::string& config_file_path,
                  const std::string& log_file_path);
+  ~TestController() override;
 
   TestController(const TestController&) = delete;
   TestController& operator=(const TestController&) = delete;
 
-  void Run();
-
   void SendConnectTo(const std::string& hostname, int port);
 
   void SendData(const NetworkTesterPacket& packet,
@@ -70,18 +70,24 @@
                     size_t len,
                     const rtc::SocketAddress& remote_addr,
                     const int64_t& packet_time_us);
-  SequenceChecker test_controller_thread_checker_;
-  SequenceChecker packet_sender_checker_;
-  rtc::BasicPacketSocketFactory socket_factory_;
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker test_controller_thread_checker_;
+  std::unique_ptr<rtc::SocketServer> socket_server_;
+  std::unique_ptr<rtc::Thread> packet_sender_thread_;
+  rtc::BasicPacketSocketFactory socket_factory_
+      RTC_GUARDED_BY(packet_sender_thread_);
   const std::string config_file_path_;
-  PacketLogger packet_logger_;
-  Mutex local_test_done_lock_;
-  bool local_test_done_ RTC_GUARDED_BY(local_test_done_lock_);
-  bool remote_test_done_;
-  std::array<char, kEthernetMtu> send_data_;
-  std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_;
+  PacketLogger packet_logger_ RTC_GUARDED_BY(packet_sender_thread_);
+  Mutex test_done_lock_ RTC_GUARDED_BY(test_controller_thread_checker_);
+  bool local_test_done_ RTC_GUARDED_BY(test_done_lock_);
+  bool remote_test_done_ RTC_GUARDED_BY(test_done_lock_);
+  std::array<char, kEthernetMtu> send_data_
+      RTC_GUARDED_BY(packet_sender_thread_);
+  std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_
+      RTC_GUARDED_BY(packet_sender_thread_);
   rtc::SocketAddress remote_address_;
-  std::unique_ptr<PacketSender> packet_sender_;
+  std::unique_ptr<PacketSender> packet_sender_
+      RTC_GUARDED_BY(packet_sender_thread_);
+  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
 };
 
 }  // namespace webrtc