network_tester: Remove usage of rtc::Thread::socketserver() and cleanup
Instead of creating a TaskQueue from packet_sender, create a rtc::Thread
in test_controller so that test_controller instantiates a SocketServer,
eliminating the use of rtc::Thread::socketserver().
Also did various cleanups, such as adding threading annotations, and
ensuring that all network operations are done in dedicated threads.
Bug: webrtc:13145
Test: Unittest, and manually verified using Android clients and Linux servers
Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321
Commit-Queue: Daniel.l Lee <daniel.l@hpcnt.com>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37411}
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index d0e2f4c..6b16708 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -14,7 +14,9 @@
#include "absl/types/optional.h"
#include "rtc_base/checks.h"
+#include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/ip_address.h"
+#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
namespace webrtc {
@@ -23,20 +25,32 @@
int max_port,
const std::string& config_file_path,
const std::string& log_file_path)
- // TODO(bugs.webrtc.org/13145): Add a SocketFactory argument.
- : socket_factory_(
- rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()),
+ : socket_server_(rtc::CreateDefaultSocketServer()),
+ packet_sender_thread_(
+ std::make_unique<rtc::Thread>(socket_server_.get())),
+ socket_factory_(socket_server_.get()),
config_file_path_(config_file_path),
packet_logger_(log_file_path),
local_test_done_(false),
- remote_test_done_(false) {
+ remote_test_done_(false),
+ task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) {
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- packet_sender_checker_.Detach();
send_data_.fill(42);
- udp_socket_ =
- std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
- rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
- udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+ packet_sender_thread_->SetName("PacketSender", nullptr);
+ packet_sender_thread_->Start();
+ packet_sender_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ udp_socket_ =
+ std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
+ rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
+ udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+ });
+}
+
+TestController::~TestController() {
+ RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+ packet_sender_thread_->Invoke<void>(
+ RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); });
}
void TestController::SendConnectTo(const std::string& hostname, int port) {
@@ -45,18 +59,22 @@
NetworkTesterPacket packet;
packet.set_type(NetworkTesterPacket::HAND_SHAKING);
SendData(packet, absl::nullopt);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
}
-void TestController::Run() {
- RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- rtc::Thread::Current()->ProcessMessages(0);
-}
-
void TestController::SendData(const NetworkTesterPacket& packet,
absl::optional<size_t> data_size) {
+ if (!packet_sender_thread_->IsCurrent()) {
+ packet_sender_thread_->PostTask(ToQueuedTask(
+ task_safety_flag_,
+ [this, packet, data_size]() { this->SendData(packet, data_size); }));
+ return;
+ }
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ RTC_LOG(LS_VERBOSE) << "SendData";
+
// Can be call from packet_sender or from test_controller thread.
size_t packet_size = packet.ByteSizeLong();
send_data_[0] = packet_size;
@@ -69,17 +87,17 @@
}
void TestController::OnTestDone() {
- RTC_DCHECK_RUN_ON(&packet_sender_checker_);
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
NetworkTesterPacket packet;
packet.set_type(NetworkTesterPacket::TEST_DONE);
SendData(packet, absl::nullopt);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = true;
}
bool TestController::IsTestDone() {
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
return local_test_done_ && remote_test_done_;
}
@@ -88,7 +106,8 @@
size_t len,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
- RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ RTC_LOG(LS_VERBOSE) << "OnReadPacket";
size_t packet_size = data[0];
std::string receive_data(&data[1], packet_size);
NetworkTesterPacket packet;
@@ -100,17 +119,21 @@
packet.set_type(NetworkTesterPacket::TEST_START);
remote_address_ = remote_addr;
SendData(packet, absl::nullopt);
- packet_sender_.reset(new PacketSender(this, config_file_path_));
+ packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+ task_safety_flag_,
+ config_file_path_));
packet_sender_->StartSending();
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
break;
}
case NetworkTesterPacket::TEST_START: {
- packet_sender_.reset(new PacketSender(this, config_file_path_));
+ packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+ task_safety_flag_,
+ config_file_path_));
packet_sender_->StartSending();
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
break;
@@ -122,6 +145,7 @@
break;
}
case NetworkTesterPacket::TEST_DONE: {
+ MutexLock scoped_lock(&test_done_lock_);
remote_test_done_ = true;
break;
}