network_tester: Remove usage of rtc::Thread::socketserver() and cleanup

Instead of creating a TaskQueue from packet_sender, create a rtc::Thread
in test_controller so that test_controller instantiates a SocketServer,
eliminating the use of rtc::Thread::socketserver().
Also did various cleanups, such as adding threading annotations, and
ensuring that all network operations are done in dedicated threads.

Bug: webrtc:13145
Test: Unittest, and manually verified using Android clients and Linux servers
Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321
Commit-Queue: Daniel.l Lee <daniel.l@hpcnt.com>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37411}
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index d0e2f4c..6b16708 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -14,7 +14,9 @@
 
 #include "absl/types/optional.h"
 #include "rtc_base/checks.h"
+#include "rtc_base/internal/default_socket_server.h"
 #include "rtc_base/ip_address.h"
+#include "rtc_base/logging.h"
 #include "rtc_base/thread.h"
 
 namespace webrtc {
@@ -23,20 +25,32 @@
                                int max_port,
                                const std::string& config_file_path,
                                const std::string& log_file_path)
-    // TODO(bugs.webrtc.org/13145): Add a SocketFactory argument.
-    : socket_factory_(
-          rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()),
+    : socket_server_(rtc::CreateDefaultSocketServer()),
+      packet_sender_thread_(
+          std::make_unique<rtc::Thread>(socket_server_.get())),
+      socket_factory_(socket_server_.get()),
       config_file_path_(config_file_path),
       packet_logger_(log_file_path),
       local_test_done_(false),
-      remote_test_done_(false) {
+      remote_test_done_(false),
+      task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) {
   RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  packet_sender_checker_.Detach();
   send_data_.fill(42);
-  udp_socket_ =
-      std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
-          rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
-  udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+  packet_sender_thread_->SetName("PacketSender", nullptr);
+  packet_sender_thread_->Start();
+  packet_sender_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
+    RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+    udp_socket_ =
+        std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
+            rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
+    udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+  });
+}
+
+TestController::~TestController() {
+  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+  packet_sender_thread_->Invoke<void>(
+      RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); });
 }
 
 void TestController::SendConnectTo(const std::string& hostname, int port) {
@@ -45,18 +59,22 @@
   NetworkTesterPacket packet;
   packet.set_type(NetworkTesterPacket::HAND_SHAKING);
   SendData(packet, absl::nullopt);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   local_test_done_ = false;
   remote_test_done_ = false;
 }
 
-void TestController::Run() {
-  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  rtc::Thread::Current()->ProcessMessages(0);
-}
-
 void TestController::SendData(const NetworkTesterPacket& packet,
                               absl::optional<size_t> data_size) {
+  if (!packet_sender_thread_->IsCurrent()) {
+    packet_sender_thread_->PostTask(ToQueuedTask(
+        task_safety_flag_,
+        [this, packet, data_size]() { this->SendData(packet, data_size); }));
+    return;
+  }
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+  RTC_LOG(LS_VERBOSE) << "SendData";
+
   // Can be call from packet_sender or from test_controller thread.
   size_t packet_size = packet.ByteSizeLong();
   send_data_[0] = packet_size;
@@ -69,17 +87,17 @@
 }
 
 void TestController::OnTestDone() {
-  RTC_DCHECK_RUN_ON(&packet_sender_checker_);
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
   NetworkTesterPacket packet;
   packet.set_type(NetworkTesterPacket::TEST_DONE);
   SendData(packet, absl::nullopt);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   local_test_done_ = true;
 }
 
 bool TestController::IsTestDone() {
   RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
-  MutexLock scoped_lock(&local_test_done_lock_);
+  MutexLock scoped_lock(&test_done_lock_);
   return local_test_done_ && remote_test_done_;
 }
 
@@ -88,7 +106,8 @@
                                   size_t len,
                                   const rtc::SocketAddress& remote_addr,
                                   const int64_t& packet_time_us) {
-  RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+  RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+  RTC_LOG(LS_VERBOSE) << "OnReadPacket";
   size_t packet_size = data[0];
   std::string receive_data(&data[1], packet_size);
   NetworkTesterPacket packet;
@@ -100,17 +119,21 @@
       packet.set_type(NetworkTesterPacket::TEST_START);
       remote_address_ = remote_addr;
       SendData(packet, absl::nullopt);
-      packet_sender_.reset(new PacketSender(this, config_file_path_));
+      packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+                                            task_safety_flag_,
+                                            config_file_path_));
       packet_sender_->StartSending();
-      MutexLock scoped_lock(&local_test_done_lock_);
+      MutexLock scoped_lock(&test_done_lock_);
       local_test_done_ = false;
       remote_test_done_ = false;
       break;
     }
     case NetworkTesterPacket::TEST_START: {
-      packet_sender_.reset(new PacketSender(this, config_file_path_));
+      packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+                                            task_safety_flag_,
+                                            config_file_path_));
       packet_sender_->StartSending();
-      MutexLock scoped_lock(&local_test_done_lock_);
+      MutexLock scoped_lock(&test_done_lock_);
       local_test_done_ = false;
       remote_test_done_ = false;
       break;
@@ -122,6 +145,7 @@
       break;
     }
     case NetworkTesterPacket::TEST_DONE: {
+      MutexLock scoped_lock(&test_done_lock_);
       remote_test_done_ = true;
       break;
     }