network_tester: Remove usage of rtc::Thread::socketserver() and cleanup
Instead of creating a TaskQueue from packet_sender, create a rtc::Thread
in test_controller so that test_controller instantiates a SocketServer,
eliminating the use of rtc::Thread::socketserver().
Also did various cleanups, such as adding threading annotations, and
ensuring that all network operations are done in dedicated threads.
Bug: webrtc:13145
Test: Unittest, and manually verified using Android clients and Linux servers
Change-Id: I05ebe5e29bd80f14a193c9ee8b0bf63a1b6b94d7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/263321
Commit-Queue: Daniel.l Lee <daniel.l@hpcnt.com>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37411}
diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn
index 4fef840..bb1f5d9 100644
--- a/rtc_tools/network_tester/BUILD.gn
+++ b/rtc_tools/network_tester/BUILD.gn
@@ -47,6 +47,7 @@
"../../rtc_base:checks",
"../../rtc_base:ignore_wundef",
"../../rtc_base:ip_address",
+ "../../rtc_base:logging",
"../../rtc_base:macromagic",
"../../rtc_base:protobuf_utils",
"../../rtc_base:rtc_task_queue",
@@ -81,6 +82,7 @@
deps = [
":network_tester",
"../../rtc_base:gunit_helpers",
+ "../../rtc_base:threading",
"../../test:fileutils",
"../../test:test_support",
"//testing/gtest",
@@ -98,7 +100,11 @@
rtc_executable("network_tester_server") {
sources = [ "server.cc" ]
- deps = [ ":network_tester" ]
+ deps = [
+ ":network_tester",
+ "../../rtc_base:null_socket_server",
+ "../../rtc_base:threading",
+ ]
}
}
@@ -158,9 +164,13 @@
}
rtc_shared_library("network_tester_so") {
- sources = [ "jni.cpp" ]
+ sources = [ "jni.cc" ]
- deps = [ ":network_tester" ]
+ deps = [
+ ":network_tester",
+ "../../rtc_base:logging",
+ "../../rtc_base:threading",
+ ]
suppressed_configs += [ "//build/config/android:hide_all_but_jni_onload" ]
configs += [ "//build/config/android:hide_all_but_jni" ]
diff --git a/rtc_tools/network_tester/config_reader.cc b/rtc_tools/network_tester/config_reader.cc
index ed76a3f..16ae458 100644
--- a/rtc_tools/network_tester/config_reader.cc
+++ b/rtc_tools/network_tester/config_reader.cc
@@ -21,7 +21,8 @@
: proto_config_index_(0) {
std::ifstream config_stream(config_file_path,
std::ios_base::in | std::ios_base::binary);
- RTC_DCHECK(config_stream.is_open());
+ RTC_DCHECK(config_stream.is_open())
+ << "Config " << config_file_path << " open failed";
RTC_DCHECK(config_stream.good());
std::string config_data((std::istreambuf_iterator<char>(config_stream)),
(std::istreambuf_iterator<char>()));
diff --git a/rtc_tools/network_tester/jni.cpp b/rtc_tools/network_tester/jni.cc
similarity index 87%
rename from rtc_tools/network_tester/jni.cpp
rename to rtc_tools/network_tester/jni.cc
index 818dd7c..f192739 100644
--- a/rtc_tools/network_tester/jni.cpp
+++ b/rtc_tools/network_tester/jni.cc
@@ -13,12 +13,15 @@
#define JNIEXPORT __attribute__((visibility("default")))
#include <string>
+#include "rtc_base/logging.h"
+#include "rtc_base/thread.h"
#include "rtc_tools/network_tester/test_controller.h"
extern "C" JNIEXPORT jlong JNICALL
Java_com_google_media_networktester_NetworkTester_CreateTestController(
JNIEnv* jni,
jclass) {
+ rtc::ThreadManager::Instance()->WrapCurrentThread();
return reinterpret_cast<intptr_t>(new webrtc::TestController(
0, 0, "/mnt/sdcard/network_tester_client_config.dat",
"/mnt/sdcard/network_tester_client_packet_log.dat"));
@@ -47,7 +50,8 @@
JNIEnv* jni,
jclass,
jlong native_pointer) {
- reinterpret_cast<webrtc::TestController*>(native_pointer)->Run();
+ // 100 ms arbitrary chosen, but it works well.
+ rtc::Thread::Current()->ProcessMessages(/*cms=*/100);
}
extern "C" JNIEXPORT void JNICALL
@@ -60,4 +64,5 @@
if (test_controller) {
delete test_controller;
}
+ rtc::ThreadManager::Instance()->UnwrapCurrentThread();
}
diff --git a/rtc_tools/network_tester/network_tester_unittest.cc b/rtc_tools/network_tester/network_tester_unittest.cc
index 156e8bb..60b34e4 100644
--- a/rtc_tools/network_tester/network_tester_unittest.cc
+++ b/rtc_tools/network_tester/network_tester_unittest.cc
@@ -20,6 +20,7 @@
namespace webrtc {
TEST(NetworkTesterTest, ServerClient) {
+ rtc::AutoThread main_thread;
TestController client(
0, 0, webrtc::test::ResourcePath("network_tester/client_config", "dat"),
webrtc::test::OutputPath() + "client_packet_log.dat");
diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc
index b2c6cd9..b80bb98 100644
--- a/rtc_tools/network_tester/packet_sender.cc
+++ b/rtc_tools/network_tester/packet_sender.cc
@@ -15,8 +15,6 @@
#include <string>
#include <utility>
-#include "absl/types/optional.h"
-#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/time_utils.h"
@@ -29,12 +27,16 @@
class SendPacketTask : public QueuedTask {
public:
- explicit SendPacketTask(PacketSender* packet_sender)
- : target_time_ms_(rtc::TimeMillis()), packet_sender_(packet_sender) {}
+ explicit SendPacketTask(
+ PacketSender* packet_sender,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
+ : target_time_ms_(rtc::TimeMillis()),
+ packet_sender_(packet_sender),
+ task_safety_flag_(task_safety_flag) {}
private:
bool Run() override {
- if (packet_sender_->IsSending()) {
+ if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
packet_sender_->SendPacket();
target_time_ms_ += packet_sender_->GetSendIntervalMs();
int64_t delay_ms = std::max(static_cast<int64_t>(0),
@@ -48,17 +50,24 @@
}
int64_t target_time_ms_;
PacketSender* const packet_sender_;
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
class UpdateTestSettingTask : public QueuedTask {
public:
- UpdateTestSettingTask(PacketSender* packet_sender,
- std::unique_ptr<ConfigReader> config_reader)
+ UpdateTestSettingTask(
+ PacketSender* packet_sender,
+ std::unique_ptr<ConfigReader> config_reader,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
: packet_sender_(packet_sender),
- config_reader_(std::move(config_reader)) {}
+ config_reader_(std::move(config_reader)),
+ task_safety_flag_(task_safety_flag) {}
private:
bool Run() override {
+ if (!task_safety_flag_->alive()) {
+ return true;
+ }
auto config = config_reader_->GetNextConfig();
if (config) {
packet_sender_->UpdateTestSetting((*config).packet_size,
@@ -73,34 +82,38 @@
}
PacketSender* const packet_sender_;
const std::unique_ptr<ConfigReader> config_reader_;
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
} // namespace
-PacketSender::PacketSender(TestController* test_controller,
- const std::string& config_file_path)
+PacketSender::PacketSender(
+ TestController* test_controller,
+ webrtc::TaskQueueBase* worker_queue,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+ const std::string& config_file_path)
: packet_size_(0),
send_interval_ms_(0),
sequence_number_(0),
sending_(false),
config_file_path_(config_file_path),
test_controller_(test_controller),
- task_queue_factory_(CreateDefaultTaskQueueFactory()),
- worker_queue_(task_queue_factory_->CreateTaskQueue(
- "Packet Sender",
- TaskQueueFactory::Priority::HIGH)) {}
+ worker_queue_(worker_queue),
+ task_safety_flag_(task_safety_flag) {}
PacketSender::~PacketSender() = default;
void PacketSender::StartSending() {
worker_queue_checker_.Detach();
- worker_queue_.PostTask([this]() {
+ worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
sending_ = true;
- });
- worker_queue_.PostTask(std::make_unique<UpdateTestSettingTask>(
- this, std::make_unique<ConfigReader>(config_file_path_)));
- worker_queue_.PostTask(std::make_unique<SendPacketTask>(this));
+ }));
+ worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
+ this, std::make_unique<ConfigReader>(config_file_path_),
+ task_safety_flag_));
+ worker_queue_->PostTask(
+ std::make_unique<SendPacketTask>(this, task_safety_flag_));
}
void PacketSender::StopSending() {
diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h
index 233ed6a..323f75b 100644
--- a/rtc_tools/network_tester/packet_sender.h
+++ b/rtc_tools/network_tester/packet_sender.h
@@ -35,8 +35,11 @@
class PacketSender {
public:
- PacketSender(TestController* test_controller,
- const std::string& config_file_path);
+ PacketSender(
+ TestController* test_controller,
+ webrtc::TaskQueueBase* worker_queue,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+ const std::string& config_file_path);
~PacketSender();
PacketSender(const PacketSender&) = delete;
@@ -59,8 +62,8 @@
bool sending_ RTC_GUARDED_BY(worker_queue_checker_);
const std::string config_file_path_;
TestController* const test_controller_;
- std::unique_ptr<TaskQueueFactory> task_queue_factory_;
- rtc::TaskQueue worker_queue_;
+ webrtc::TaskQueueBase* worker_queue_;
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
} // namespace webrtc
diff --git a/rtc_tools/network_tester/server.cc b/rtc_tools/network_tester/server.cc
index 4074a48..f0f610a 100644
--- a/rtc_tools/network_tester/server.cc
+++ b/rtc_tools/network_tester/server.cc
@@ -8,13 +8,16 @@
* be found in the AUTHORS file in the root of the source tree.
*/
+#include "rtc_base/null_socket_server.h"
#include "rtc_tools/network_tester/test_controller.h"
int main(int /*argn*/, char* /*argv*/[]) {
+ rtc::Thread main_thread(std::make_unique<rtc::NullSocketServer>());
webrtc::TestController server(9090, 9090, "server_config.dat",
"server_packet_log.dat");
while (!server.IsTestDone()) {
- server.Run();
+ // 100 ms is arbitrary chosen.
+ main_thread.ProcessMessages(/*cms=*/100);
}
return 0;
}
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index d0e2f4c..6b16708 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -14,7 +14,9 @@
#include "absl/types/optional.h"
#include "rtc_base/checks.h"
+#include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/ip_address.h"
+#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
namespace webrtc {
@@ -23,20 +25,32 @@
int max_port,
const std::string& config_file_path,
const std::string& log_file_path)
- // TODO(bugs.webrtc.org/13145): Add a SocketFactory argument.
- : socket_factory_(
- rtc::ThreadManager::Instance()->WrapCurrentThread()->socketserver()),
+ : socket_server_(rtc::CreateDefaultSocketServer()),
+ packet_sender_thread_(
+ std::make_unique<rtc::Thread>(socket_server_.get())),
+ socket_factory_(socket_server_.get()),
config_file_path_(config_file_path),
packet_logger_(log_file_path),
local_test_done_(false),
- remote_test_done_(false) {
+ remote_test_done_(false),
+ task_safety_flag_(PendingTaskSafetyFlag::CreateDetached()) {
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- packet_sender_checker_.Detach();
send_data_.fill(42);
- udp_socket_ =
- std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
- rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
- udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+ packet_sender_thread_->SetName("PacketSender", nullptr);
+ packet_sender_thread_->Start();
+ packet_sender_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ udp_socket_ =
+ std::unique_ptr<rtc::AsyncPacketSocket>(socket_factory_.CreateUdpSocket(
+ rtc::SocketAddress(rtc::GetAnyIP(AF_INET), 0), min_port, max_port));
+ udp_socket_->SignalReadPacket.connect(this, &TestController::OnReadPacket);
+ });
+}
+
+TestController::~TestController() {
+ RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+ packet_sender_thread_->Invoke<void>(
+ RTC_FROM_HERE, [this]() { task_safety_flag_->SetNotAlive(); });
}
void TestController::SendConnectTo(const std::string& hostname, int port) {
@@ -45,18 +59,22 @@
NetworkTesterPacket packet;
packet.set_type(NetworkTesterPacket::HAND_SHAKING);
SendData(packet, absl::nullopt);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
}
-void TestController::Run() {
- RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- rtc::Thread::Current()->ProcessMessages(0);
-}
-
void TestController::SendData(const NetworkTesterPacket& packet,
absl::optional<size_t> data_size) {
+ if (!packet_sender_thread_->IsCurrent()) {
+ packet_sender_thread_->PostTask(ToQueuedTask(
+ task_safety_flag_,
+ [this, packet, data_size]() { this->SendData(packet, data_size); }));
+ return;
+ }
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ RTC_LOG(LS_VERBOSE) << "SendData";
+
// Can be call from packet_sender or from test_controller thread.
size_t packet_size = packet.ByteSizeLong();
send_data_[0] = packet_size;
@@ -69,17 +87,17 @@
}
void TestController::OnTestDone() {
- RTC_DCHECK_RUN_ON(&packet_sender_checker_);
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
NetworkTesterPacket packet;
packet.set_type(NetworkTesterPacket::TEST_DONE);
SendData(packet, absl::nullopt);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = true;
}
bool TestController::IsTestDone() {
RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
return local_test_done_ && remote_test_done_;
}
@@ -88,7 +106,8 @@
size_t len,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us) {
- RTC_DCHECK_RUN_ON(&test_controller_thread_checker_);
+ RTC_DCHECK_RUN_ON(packet_sender_thread_.get());
+ RTC_LOG(LS_VERBOSE) << "OnReadPacket";
size_t packet_size = data[0];
std::string receive_data(&data[1], packet_size);
NetworkTesterPacket packet;
@@ -100,17 +119,21 @@
packet.set_type(NetworkTesterPacket::TEST_START);
remote_address_ = remote_addr;
SendData(packet, absl::nullopt);
- packet_sender_.reset(new PacketSender(this, config_file_path_));
+ packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+ task_safety_flag_,
+ config_file_path_));
packet_sender_->StartSending();
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
break;
}
case NetworkTesterPacket::TEST_START: {
- packet_sender_.reset(new PacketSender(this, config_file_path_));
+ packet_sender_.reset(new PacketSender(this, packet_sender_thread_.get(),
+ task_safety_flag_,
+ config_file_path_));
packet_sender_->StartSending();
- MutexLock scoped_lock(&local_test_done_lock_);
+ MutexLock scoped_lock(&test_done_lock_);
local_test_done_ = false;
remote_test_done_ = false;
break;
@@ -122,6 +145,7 @@
break;
}
case NetworkTesterPacket::TEST_DONE: {
+ MutexLock scoped_lock(&test_done_lock_);
remote_test_done_ = true;
break;
}
diff --git a/rtc_tools/network_tester/test_controller.h b/rtc_tools/network_tester/test_controller.h
index 3933b46..b08fbd5 100644
--- a/rtc_tools/network_tester/test_controller.h
+++ b/rtc_tools/network_tester/test_controller.h
@@ -25,6 +25,7 @@
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_tools/network_tester/packet_logger.h"
@@ -49,12 +50,11 @@
int max_port,
const std::string& config_file_path,
const std::string& log_file_path);
+ ~TestController() override;
TestController(const TestController&) = delete;
TestController& operator=(const TestController&) = delete;
- void Run();
-
void SendConnectTo(const std::string& hostname, int port);
void SendData(const NetworkTesterPacket& packet,
@@ -70,18 +70,24 @@
size_t len,
const rtc::SocketAddress& remote_addr,
const int64_t& packet_time_us);
- SequenceChecker test_controller_thread_checker_;
- SequenceChecker packet_sender_checker_;
- rtc::BasicPacketSocketFactory socket_factory_;
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker test_controller_thread_checker_;
+ std::unique_ptr<rtc::SocketServer> socket_server_;
+ std::unique_ptr<rtc::Thread> packet_sender_thread_;
+ rtc::BasicPacketSocketFactory socket_factory_
+ RTC_GUARDED_BY(packet_sender_thread_);
const std::string config_file_path_;
- PacketLogger packet_logger_;
- Mutex local_test_done_lock_;
- bool local_test_done_ RTC_GUARDED_BY(local_test_done_lock_);
- bool remote_test_done_;
- std::array<char, kEthernetMtu> send_data_;
- std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_;
+ PacketLogger packet_logger_ RTC_GUARDED_BY(packet_sender_thread_);
+ Mutex test_done_lock_ RTC_GUARDED_BY(test_controller_thread_checker_);
+ bool local_test_done_ RTC_GUARDED_BY(test_done_lock_);
+ bool remote_test_done_ RTC_GUARDED_BY(test_done_lock_);
+ std::array<char, kEthernetMtu> send_data_
+ RTC_GUARDED_BY(packet_sender_thread_);
+ std::unique_ptr<rtc::AsyncPacketSocket> udp_socket_
+ RTC_GUARDED_BY(packet_sender_thread_);
rtc::SocketAddress remote_address_;
- std::unique_ptr<PacketSender> packet_sender_;
+ std::unique_ptr<PacketSender> packet_sender_
+ RTC_GUARDED_BY(packet_sender_thread_);
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
};
} // namespace webrtc