blob: 51321985edfd19affdb96b02f0e89bea53e5487b [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Jonas Olssona4d87372019-07-05 19:08:33 +020011#include "rtc_base/thread.h"
12
kwibergbfefb032016-05-01 14:53:46 -070013#include <memory>
14
Danil Chapovalov912b3b82019-11-22 15:52:40 +010015#include "api/task_queue/task_queue_factory.h"
16#include "api/task_queue/task_queue_test.h"
Steve Anton10542f22019-01-11 09:11:00 -080017#include "rtc_base/async_invoker.h"
18#include "rtc_base/async_udp_socket.h"
Sebastian Jansson73387822020-01-16 11:15:35 +010019#include "rtc_base/atomic_ops.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020020#include "rtc_base/event.h"
21#include "rtc_base/gunit.h"
Steve Anton10542f22019-01-11 09:11:00 -080022#include "rtc_base/null_socket_server.h"
23#include "rtc_base/physical_socket_server.h"
24#include "rtc_base/socket_address.h"
Markus Handell4ab7dde2020-07-10 13:23:25 +020025#include "rtc_base/synchronization/mutex.h"
Sebastian Jansson73387822020-01-16 11:15:35 +010026#include "rtc_base/task_utils/to_queued_task.h"
Artem Titove41c4332018-07-25 15:04:28 +020027#include "rtc_base/third_party/sigslot/sigslot.h"
Sebastian Janssonda7267a2020-03-03 10:48:05 +010028#include "test/testsupport/rtc_expect_death.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000029
30#if defined(WEBRTC_WIN)
31#include <comdef.h> // NOLINT
Markus Handell4ab7dde2020-07-10 13:23:25 +020032
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000033#endif
34
Mirko Bonadeie10b1632018-12-11 18:43:40 +010035namespace rtc {
36namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000037
Sebastian Jansson73387822020-01-16 11:15:35 +010038using ::webrtc::ToQueuedTask;
39
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000040// Generates a sequence of numbers (collaboratively).
41class TestGenerator {
42 public:
43 TestGenerator() : last(0), count(0) {}
44
45 int Next(int prev) {
46 int result = prev + last;
47 last = result;
48 count += 1;
49 return result;
50 }
51
52 int last;
53 int count;
54};
55
56struct TestMessage : public MessageData {
57 explicit TestMessage(int v) : value(v) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000058
59 int value;
60};
61
62// Receives on a socket and sends by posting messages.
63class SocketClient : public TestGenerator, public sigslot::has_slots<> {
64 public:
Yves Gerey665174f2018-06-19 15:03:05 +020065 SocketClient(AsyncSocket* socket,
66 const SocketAddress& addr,
67 Thread* post_thread,
68 MessageHandler* phandler)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000069 : socket_(AsyncUDPSocket::Create(socket, addr)),
70 post_thread_(post_thread),
71 post_handler_(phandler) {
72 socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket);
73 }
74
Steve Anton9de3aac2017-10-24 10:08:26 -070075 ~SocketClient() override { delete socket_; }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076
77 SocketAddress address() const { return socket_->GetLocalAddress(); }
78
Yves Gerey665174f2018-06-19 15:03:05 +020079 void OnPacket(AsyncPacketSocket* socket,
80 const char* buf,
81 size_t size,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000082 const SocketAddress& remote_addr,
Niels Möllere6933812018-11-05 13:01:41 +010083 const int64_t& packet_time_us) {
Peter Boström0c4e06b2015-10-07 12:23:21 +020084 EXPECT_EQ(size, sizeof(uint32_t));
85 uint32_t prev = reinterpret_cast<const uint32_t*>(buf)[0];
86 uint32_t result = Next(prev);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000087
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070088 post_thread_->PostDelayed(RTC_FROM_HERE, 200, post_handler_, 0,
89 new TestMessage(result));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000090 }
91
92 private:
93 AsyncUDPSocket* socket_;
94 Thread* post_thread_;
95 MessageHandler* post_handler_;
96};
97
98// Receives messages and sends on a socket.
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +020099class MessageClient : public MessageHandlerAutoCleanup, public TestGenerator {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000100 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200101 MessageClient(Thread* pth, Socket* socket) : socket_(socket) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000102
Steve Anton9de3aac2017-10-24 10:08:26 -0700103 ~MessageClient() override { delete socket_; }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000104
Steve Anton9de3aac2017-10-24 10:08:26 -0700105 void OnMessage(Message* pmsg) override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000106 TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata);
107 int result = Next(msg->value);
108 EXPECT_GE(socket_->Send(&result, sizeof(result)), 0);
109 delete msg;
110 }
111
112 private:
113 Socket* socket_;
114};
115
deadbeefaea92932017-05-23 12:55:03 -0700116class CustomThread : public rtc::Thread {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000117 public:
tommie7251592017-07-14 14:44:46 -0700118 CustomThread()
119 : Thread(std::unique_ptr<SocketServer>(new rtc::NullSocketServer())) {}
Steve Anton9de3aac2017-10-24 10:08:26 -0700120 ~CustomThread() override { Stop(); }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000121 bool Start() { return false; }
jiayl@webrtc.orgba737cb2014-09-18 16:45:21 +0000122
Yves Gerey665174f2018-06-19 15:03:05 +0200123 bool WrapCurrent() { return Thread::WrapCurrent(); }
124 void UnwrapCurrent() { Thread::UnwrapCurrent(); }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000125};
126
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000127// A thread that does nothing when it runs and signals an event
128// when it is destroyed.
129class SignalWhenDestroyedThread : public Thread {
130 public:
131 SignalWhenDestroyedThread(Event* event)
tommie7251592017-07-14 14:44:46 -0700132 : Thread(std::unique_ptr<SocketServer>(new NullSocketServer())),
133 event_(event) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000134
Steve Anton9de3aac2017-10-24 10:08:26 -0700135 ~SignalWhenDestroyedThread() override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000136 Stop();
137 event_->Set();
138 }
139
Steve Anton9de3aac2017-10-24 10:08:26 -0700140 void Run() override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000141 // Do nothing.
142 }
143
144 private:
145 Event* event_;
146};
147
nissed9b75be2015-11-16 00:54:07 -0800148// A bool wrapped in a mutex, to avoid data races. Using a volatile
149// bool should be sufficient for correct code ("eventual consistency"
150// between caches is sufficient), but we can't tell the compiler about
151// that, and then tsan complains about a data race.
152
153// See also discussion at
154// http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads
155
156// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
157// the right thing to do, but those features are not yet allowed. Or
deadbeefaea92932017-05-23 12:55:03 -0700158// rtc::AtomicInt, if/when that is added. Since the use isn't
nissed9b75be2015-11-16 00:54:07 -0800159// performance critical, use a plain critical section for the time
160// being.
161
162class AtomicBool {
163 public:
164 explicit AtomicBool(bool value = false) : flag_(value) {}
165 AtomicBool& operator=(bool value) {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200166 webrtc::MutexLock scoped_lock(&mutex_);
nissed9b75be2015-11-16 00:54:07 -0800167 flag_ = value;
168 return *this;
169 }
170 bool get() const {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200171 webrtc::MutexLock scoped_lock(&mutex_);
nissed9b75be2015-11-16 00:54:07 -0800172 return flag_;
173 }
174
175 private:
Markus Handell4ab7dde2020-07-10 13:23:25 +0200176 mutable webrtc::Mutex mutex_;
nissed9b75be2015-11-16 00:54:07 -0800177 bool flag_;
178};
179
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000180// Function objects to test Thread::Invoke.
181struct FunctorA {
182 int operator()() { return 42; }
183};
184class FunctorB {
185 public:
nissed9b75be2015-11-16 00:54:07 -0800186 explicit FunctorB(AtomicBool* flag) : flag_(flag) {}
Yves Gerey665174f2018-06-19 15:03:05 +0200187 void operator()() {
188 if (flag_)
189 *flag_ = true;
190 }
191
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000192 private:
nissed9b75be2015-11-16 00:54:07 -0800193 AtomicBool* flag_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000194};
195struct FunctorC {
196 int operator()() {
197 Thread::Current()->ProcessMessages(50);
198 return 24;
199 }
200};
Cameron Pickettd132ce12018-03-12 16:07:37 -0700201struct FunctorD {
202 public:
203 explicit FunctorD(AtomicBool* flag) : flag_(flag) {}
204 FunctorD(FunctorD&&) = default;
205 FunctorD& operator=(FunctorD&&) = default;
Yves Gerey665174f2018-06-19 15:03:05 +0200206 void operator()() {
207 if (flag_)
208 *flag_ = true;
209 }
210
Cameron Pickettd132ce12018-03-12 16:07:37 -0700211 private:
212 AtomicBool* flag_;
213 RTC_DISALLOW_COPY_AND_ASSIGN(FunctorD);
214};
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000215
216// See: https://code.google.com/p/webrtc/issues/detail?id=2409
217TEST(ThreadTest, DISABLED_Main) {
218 const SocketAddress addr("127.0.0.1", 0);
219
220 // Create the messaging client on its own thread.
tommie7251592017-07-14 14:44:46 -0700221 auto th1 = Thread::CreateWithSocketServer();
222 Socket* socket =
223 th1->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
224 MessageClient msg_client(th1.get(), socket);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000225
226 // Create the socket client on its own thread.
tommie7251592017-07-14 14:44:46 -0700227 auto th2 = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000228 AsyncSocket* asocket =
tommie7251592017-07-14 14:44:46 -0700229 th2->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
230 SocketClient sock_client(asocket, addr, th1.get(), &msg_client);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000231
232 socket->Connect(sock_client.address());
233
tommie7251592017-07-14 14:44:46 -0700234 th1->Start();
235 th2->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000236
237 // Get the messages started.
tommie7251592017-07-14 14:44:46 -0700238 th1->PostDelayed(RTC_FROM_HERE, 100, &msg_client, 0, new TestMessage(1));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000239
240 // Give the clients a little while to run.
241 // Messages will be processed at 100, 300, 500, 700, 900.
242 Thread* th_main = Thread::Current();
243 th_main->ProcessMessages(1000);
244
245 // Stop the sending client. Give the receiver a bit longer to run, in case
246 // it is running on a machine that is under load (e.g. the build machine).
tommie7251592017-07-14 14:44:46 -0700247 th1->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000248 th_main->ProcessMessages(200);
tommie7251592017-07-14 14:44:46 -0700249 th2->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000250
251 // Make sure the results were correct
252 EXPECT_EQ(5, msg_client.count);
253 EXPECT_EQ(34, msg_client.last);
254 EXPECT_EQ(5, sock_client.count);
255 EXPECT_EQ(55, sock_client.last);
256}
257
258// Test that setting thread names doesn't cause a malfunction.
259// There's no easy way to verify the name was set properly at this time.
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000260TEST(ThreadTest, Names) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000261 // Default name
tommie7251592017-07-14 14:44:46 -0700262 auto thread = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000263 EXPECT_TRUE(thread->Start());
264 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000265 // Name with no object parameter
tommie7251592017-07-14 14:44:46 -0700266 thread = Thread::CreateWithSocketServer();
deadbeef37f5ecf2017-02-27 14:06:41 -0800267 EXPECT_TRUE(thread->SetName("No object", nullptr));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000268 EXPECT_TRUE(thread->Start());
269 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000270 // Really long name
tommie7251592017-07-14 14:44:46 -0700271 thread = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000272 EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this));
273 EXPECT_TRUE(thread->Start());
274 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000275}
276
henrike@webrtc.orge30dab72014-10-09 15:41:40 +0000277TEST(ThreadTest, Wrap) {
278 Thread* current_thread = Thread::Current();
Niels Möller5a8f8602019-06-12 11:30:59 +0200279 ThreadManager::Instance()->SetCurrentThread(nullptr);
280
281 {
282 CustomThread cthread;
283 EXPECT_TRUE(cthread.WrapCurrent());
284 EXPECT_EQ(&cthread, Thread::Current());
285 EXPECT_TRUE(cthread.RunningForTest());
286 EXPECT_FALSE(cthread.IsOwned());
287 cthread.UnwrapCurrent();
288 EXPECT_FALSE(cthread.RunningForTest());
289 }
290 ThreadManager::Instance()->SetCurrentThread(current_thread);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000291}
292
Artem Titovdfc5f0d2020-07-03 12:09:26 +0200293#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
294TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
295 // Create and start the thread.
296 auto thread1 = Thread::CreateWithSocketServer();
297 auto thread2 = Thread::CreateWithSocketServer();
298
299 thread1->PostTask(ToQueuedTask(
300 [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
301 Thread* th_main = Thread::Current();
302 th_main->ProcessMessages(100);
303}
304
305TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
306 // Create and start the thread.
307 auto thread1 = Thread::CreateWithSocketServer();
308 auto thread2 = Thread::CreateWithSocketServer();
309 auto thread3 = Thread::CreateWithSocketServer();
310 auto thread4 = Thread::CreateWithSocketServer();
311
312 thread1->AllowInvokesToThread(thread2.get());
313 thread1->AllowInvokesToThread(thread3.get());
314
315 thread1->PostTask(ToQueuedTask([&]() {
316 EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
317 EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
318 EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
319 }));
320 Thread* th_main = Thread::Current();
321 th_main->ProcessMessages(100);
322}
323
324TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) {
325 // Create and start the thread.
326 auto thread1 = Thread::CreateWithSocketServer();
327 auto thread2 = Thread::CreateWithSocketServer();
328
329 thread1->DisallowAllInvokes();
330
331 thread1->PostTask(ToQueuedTask([&]() {
332 EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
333 }));
334 Thread* th_main = Thread::Current();
335 th_main->ProcessMessages(100);
336}
337#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
338
339TEST(ThreadTest, InvokesAllowedByDefault) {
340 // Create and start the thread.
341 auto thread1 = Thread::CreateWithSocketServer();
342 auto thread2 = Thread::CreateWithSocketServer();
343
344 thread1->PostTask(ToQueuedTask(
345 [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
346 Thread* th_main = Thread::Current();
347 th_main->ProcessMessages(100);
348}
349
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000350TEST(ThreadTest, Invoke) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000351 // Create and start the thread.
tommie7251592017-07-14 14:44:46 -0700352 auto thread = Thread::CreateWithSocketServer();
353 thread->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000354 // Try calling functors.
tommie7251592017-07-14 14:44:46 -0700355 EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, FunctorA()));
nissed9b75be2015-11-16 00:54:07 -0800356 AtomicBool called;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000357 FunctorB f2(&called);
tommie7251592017-07-14 14:44:46 -0700358 thread->Invoke<void>(RTC_FROM_HERE, f2);
nissed9b75be2015-11-16 00:54:07 -0800359 EXPECT_TRUE(called.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000360 // Try calling bare functions.
361 struct LocalFuncs {
362 static int Func1() { return 999; }
363 static void Func2() {}
364 };
tommie7251592017-07-14 14:44:46 -0700365 EXPECT_EQ(999, thread->Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1));
366 thread->Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000367}
368
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000369// Verifies that two threads calling Invoke on each other at the same time does
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100370// not deadlock but crash.
371#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
372TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
373 ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000374 AutoThread thread;
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100375 Thread* main_thread = Thread::Current();
tommie7251592017-07-14 14:44:46 -0700376 auto other_thread = Thread::CreateWithSocketServer();
377 other_thread->Start();
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100378 other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
379 RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
380 });
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000381}
382
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100383TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
384 ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
385 AutoThread thread;
386 Thread* first = Thread::Current();
387
388 auto second = Thread::Create();
389 second->Start();
390 auto third = Thread::Create();
391 third->Start();
392
393 second->Invoke<void>(RTC_FROM_HERE, [&] {
394 third->Invoke<void>(RTC_FROM_HERE, [&] {
395 RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
396 });
397 });
398}
399
400#endif
401
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000402// Verifies that if thread A invokes a call on thread B and thread C is trying
403// to invoke A at the same time, thread A does not handle C's invoke while
404// invoking B.
405TEST(ThreadTest, ThreeThreadsInvoke) {
406 AutoThread thread;
407 Thread* thread_a = Thread::Current();
tommie7251592017-07-14 14:44:46 -0700408 auto thread_b = Thread::CreateWithSocketServer();
409 auto thread_c = Thread::CreateWithSocketServer();
410 thread_b->Start();
411 thread_c->Start();
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000412
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000413 class LockedBool {
414 public:
415 explicit LockedBool(bool value) : value_(value) {}
416
417 void Set(bool value) {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200418 webrtc::MutexLock lock(&mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000419 value_ = value;
420 }
421
422 bool Get() {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200423 webrtc::MutexLock lock(&mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000424 return value_;
425 }
426
427 private:
Markus Handell4ab7dde2020-07-10 13:23:25 +0200428 webrtc::Mutex mutex_;
429 bool value_ RTC_GUARDED_BY(mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000430 };
431
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000432 struct LocalFuncs {
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000433 static void Set(LockedBool* out) { out->Set(true); }
434 static void InvokeSet(Thread* thread, LockedBool* out) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700435 thread->Invoke<void>(RTC_FROM_HERE, Bind(&Set, out));
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000436 }
437
438 // Set |out| true and call InvokeSet on |thread|.
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000439 static void SetAndInvokeSet(LockedBool* out,
440 Thread* thread,
441 LockedBool* out_inner) {
442 out->Set(true);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000443 InvokeSet(thread, out_inner);
444 }
445
446 // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
447 // |thread1| starts the call.
deadbeef162cb532017-02-23 17:10:07 -0800448 static void AsyncInvokeSetAndWait(AsyncInvoker* invoker,
449 Thread* thread1,
450 Thread* thread2,
451 LockedBool* out) {
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000452 LockedBool async_invoked(false);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000453
deadbeef162cb532017-02-23 17:10:07 -0800454 invoker->AsyncInvoke<void>(
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700455 RTC_FROM_HERE, thread1,
456 Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000457
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000458 EXPECT_TRUE_WAIT(async_invoked.Get(), 2000);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000459 }
460 };
461
deadbeef162cb532017-02-23 17:10:07 -0800462 AsyncInvoker invoker;
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000463 LockedBool thread_a_called(false);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000464
465 // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
466 // Thread B returns when C receives the call and C should be blocked until A
467 // starts to process messages.
tommie7251592017-07-14 14:44:46 -0700468 thread_b->Invoke<void>(RTC_FROM_HERE,
469 Bind(&LocalFuncs::AsyncInvokeSetAndWait, &invoker,
470 thread_c.get(), thread_a, &thread_a_called));
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000471 EXPECT_FALSE(thread_a_called.Get());
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000472
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000473 EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000474}
475
jbauch25d1f282016-02-05 00:25:02 -0800476// Set the name on a thread when the underlying QueueDestroyed signal is
477// triggered. This causes an error if the object is already partially
478// destroyed.
479class SetNameOnSignalQueueDestroyedTester : public sigslot::has_slots<> {
480 public:
481 SetNameOnSignalQueueDestroyedTester(Thread* thread) : thread_(thread) {
482 thread->SignalQueueDestroyed.connect(
483 this, &SetNameOnSignalQueueDestroyedTester::OnQueueDestroyed);
484 }
485
486 void OnQueueDestroyed() {
487 // Makes sure that if we access the Thread while it's being destroyed, that
488 // it doesn't cause a problem because the vtable has been modified.
489 thread_->SetName("foo", nullptr);
490 }
491
492 private:
493 Thread* thread_;
494};
495
496TEST(ThreadTest, SetNameOnSignalQueueDestroyed) {
tommie7251592017-07-14 14:44:46 -0700497 auto thread1 = Thread::CreateWithSocketServer();
498 SetNameOnSignalQueueDestroyedTester tester1(thread1.get());
499 thread1.reset();
jbauch25d1f282016-02-05 00:25:02 -0800500
501 Thread* thread2 = new AutoThread();
502 SetNameOnSignalQueueDestroyedTester tester2(thread2);
503 delete thread2;
jbauch25d1f282016-02-05 00:25:02 -0800504}
505
Sebastian Jansson73387822020-01-16 11:15:35 +0100506class ThreadQueueTest : public ::testing::Test, public Thread {
507 public:
508 ThreadQueueTest() : Thread(SocketServer::CreateDefault(), true) {}
509 bool IsLocked_Worker() {
510 if (!CritForTest()->TryEnter()) {
511 return true;
512 }
513 CritForTest()->Leave();
514 return false;
515 }
516 bool IsLocked() {
517 // We have to do this on a worker thread, or else the TryEnter will
518 // succeed, since our critical sections are reentrant.
519 std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
520 worker->Start();
521 return worker->Invoke<bool>(
522 RTC_FROM_HERE, rtc::Bind(&ThreadQueueTest::IsLocked_Worker, this));
523 }
524};
525
526struct DeletedLockChecker {
527 DeletedLockChecker(ThreadQueueTest* test, bool* was_locked, bool* deleted)
528 : test(test), was_locked(was_locked), deleted(deleted) {}
529 ~DeletedLockChecker() {
530 *deleted = true;
531 *was_locked = test->IsLocked();
532 }
533 ThreadQueueTest* test;
534 bool* was_locked;
535 bool* deleted;
536};
537
538static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
539 EXPECT_TRUE(q != nullptr);
540 int64_t now = TimeMillis();
541 q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
542 q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
543 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
544 q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
545 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
546
547 Message msg;
548 for (size_t i = 0; i < 5; ++i) {
549 memset(&msg, 0, sizeof(msg));
550 EXPECT_TRUE(q->Get(&msg, 0));
551 EXPECT_EQ(i, msg.message_id);
552 }
553
554 EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
555}
556
557TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
558 Thread q(SocketServer::CreateDefault(), true);
559 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
560
561 NullSocketServer nullss;
562 Thread q_nullss(&nullss, true);
563 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
564}
565
566TEST_F(ThreadQueueTest, DisposeNotLocked) {
567 bool was_locked = true;
568 bool deleted = false;
569 DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
570 Dispose(d);
571 Message msg;
572 EXPECT_FALSE(Get(&msg, 0));
573 EXPECT_TRUE(deleted);
574 EXPECT_FALSE(was_locked);
575}
576
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200577class DeletedMessageHandler : public MessageHandlerAutoCleanup {
Sebastian Jansson73387822020-01-16 11:15:35 +0100578 public:
579 explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {}
580 ~DeletedMessageHandler() override { *deleted_ = true; }
581 void OnMessage(Message* msg) override {}
582
583 private:
584 bool* deleted_;
585};
586
587TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) {
588 bool deleted = false;
589 DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
590 // First, post a dispose.
591 Dispose(handler);
592 // Now, post a message, which should *not* be returned by Get().
593 Post(RTC_FROM_HERE, handler, 1);
594 Message msg;
595 EXPECT_FALSE(Get(&msg, 0));
596 EXPECT_TRUE(deleted);
597}
598
599// Ensure that ProcessAllMessageQueues does its essential function; process
600// all messages (both delayed and non delayed) up until the current time, on
601// all registered message queues.
602TEST(ThreadManager, ProcessAllMessageQueues) {
603 Event entered_process_all_message_queues(true, false);
604 auto a = Thread::CreateWithSocketServer();
605 auto b = Thread::CreateWithSocketServer();
606 a->Start();
607 b->Start();
608
609 volatile int messages_processed = 0;
610 auto incrementer = [&messages_processed,
611 &entered_process_all_message_queues] {
612 // Wait for event as a means to ensure Increment doesn't occur outside
613 // of ProcessAllMessageQueues. The event is set by a message posted to
614 // the main thread, which is guaranteed to be handled inside
615 // ProcessAllMessageQueues.
616 entered_process_all_message_queues.Wait(Event::kForever);
617 AtomicOps::Increment(&messages_processed);
618 };
619 auto event_signaler = [&entered_process_all_message_queues] {
620 entered_process_all_message_queues.Set();
621 };
622
623 // Post messages (both delayed and non delayed) to both threads.
624 a->PostTask(ToQueuedTask(incrementer));
625 b->PostTask(ToQueuedTask(incrementer));
626 a->PostDelayedTask(ToQueuedTask(incrementer), 0);
627 b->PostDelayedTask(ToQueuedTask(incrementer), 0);
628 rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler));
629
630 ThreadManager::ProcessAllMessageQueuesForTesting();
631 EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
632}
633
634// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
635TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) {
636 auto t = Thread::CreateWithSocketServer();
637 t->Start();
638 t->Quit();
639 ThreadManager::ProcessAllMessageQueuesForTesting();
640}
641
642// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
643// messages.
644TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) {
645 Event entered_process_all_message_queues(true, false);
646 auto t = Thread::CreateWithSocketServer();
647 t->Start();
648
649 auto clearer = [&entered_process_all_message_queues] {
650 // Wait for event as a means to ensure Clear doesn't occur outside of
651 // ProcessAllMessageQueues. The event is set by a message posted to the
652 // main thread, which is guaranteed to be handled inside
653 // ProcessAllMessageQueues.
654 entered_process_all_message_queues.Wait(Event::kForever);
655 rtc::Thread::Current()->Clear(nullptr);
656 };
657 auto event_signaler = [&entered_process_all_message_queues] {
658 entered_process_all_message_queues.Set();
659 };
660
661 // Post messages (both delayed and non delayed) to both threads.
662 t->PostTask(RTC_FROM_HERE, clearer);
663 rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler);
664 ThreadManager::ProcessAllMessageQueuesForTesting();
665}
666
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200667class RefCountedHandler : public MessageHandlerAutoCleanup,
668 public rtc::RefCountInterface {
Sebastian Jansson73387822020-01-16 11:15:35 +0100669 public:
670 void OnMessage(Message* msg) override {}
671};
672
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200673class EmptyHandler : public MessageHandlerAutoCleanup {
Sebastian Jansson73387822020-01-16 11:15:35 +0100674 public:
675 void OnMessage(Message* msg) override {}
676};
677
678TEST(ThreadManager, ClearReentrant) {
679 std::unique_ptr<Thread> t(Thread::Create());
680 EmptyHandler handler;
681 RefCountedHandler* inner_handler(
682 new rtc::RefCountedObject<RefCountedHandler>());
683 // When the empty handler is destroyed, it will clear messages queued for
684 // itself. The message to be cleared itself wraps a MessageHandler object
685 // (RefCountedHandler) so this will cause the message queue to be cleared
686 // again in a re-entrant fashion, which previously triggered a DCHECK.
687 // The inner handler will be removed in a re-entrant fashion from the
688 // message queue of the thread while the outer handler is removed, verifying
689 // that the iterator is not invalidated in "MessageQueue::Clear".
690 t->Post(RTC_FROM_HERE, inner_handler, 0);
691 t->Post(RTC_FROM_HERE, &handler, 0,
692 new ScopedRefMessageData<RefCountedHandler>(inner_handler));
693}
694
Mirko Bonadei6a489f22019-04-09 15:11:12 +0200695class AsyncInvokeTest : public ::testing::Test {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000696 public:
697 void IntCallback(int value) {
698 EXPECT_EQ(expected_thread_, Thread::Current());
699 int_value_ = value;
700 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000701 void SetExpectedThreadForIntCallback(Thread* thread) {
702 expected_thread_ = thread;
703 }
704
705 protected:
706 enum { kWaitTimeout = 1000 };
Yves Gerey665174f2018-06-19 15:03:05 +0200707 AsyncInvokeTest() : int_value_(0), expected_thread_(nullptr) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000708
709 int int_value_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000710 Thread* expected_thread_;
711};
712
henrike@webrtc.orge30dab72014-10-09 15:41:40 +0000713TEST_F(AsyncInvokeTest, FireAndForget) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000714 AsyncInvoker invoker;
715 // Create and start the thread.
tommie7251592017-07-14 14:44:46 -0700716 auto thread = Thread::CreateWithSocketServer();
717 thread->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000718 // Try calling functor.
nissed9b75be2015-11-16 00:54:07 -0800719 AtomicBool called;
tommie7251592017-07-14 14:44:46 -0700720 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), FunctorB(&called));
nissed9b75be2015-11-16 00:54:07 -0800721 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
tommie7251592017-07-14 14:44:46 -0700722 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000723}
724
Cameron Pickettd132ce12018-03-12 16:07:37 -0700725TEST_F(AsyncInvokeTest, NonCopyableFunctor) {
726 AsyncInvoker invoker;
727 // Create and start the thread.
728 auto thread = Thread::CreateWithSocketServer();
729 thread->Start();
730 // Try calling functor.
731 AtomicBool called;
732 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), FunctorD(&called));
733 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
734 thread->Stop();
735}
736
deadbeef162cb532017-02-23 17:10:07 -0800737TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
738 // Use these events to get in a state where the functor is in the middle of
739 // executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE"
740 // is run.
Niels Möllerc572ff32018-11-07 08:43:50 +0100741 Event functor_started;
742 Event functor_continue;
743 Event functor_finished;
deadbeef162cb532017-02-23 17:10:07 -0800744
tommie7251592017-07-14 14:44:46 -0700745 auto thread = Thread::CreateWithSocketServer();
746 thread->Start();
deadbeef162cb532017-02-23 17:10:07 -0800747 volatile bool invoker_destroyed = false;
748 {
deadbeef3af63b02017-08-08 17:59:47 -0700749 auto functor = [&functor_started, &functor_continue, &functor_finished,
750 &invoker_destroyed] {
751 functor_started.Set();
752 functor_continue.Wait(Event::kForever);
753 rtc::Thread::Current()->SleepMs(kWaitTimeout);
754 EXPECT_FALSE(invoker_destroyed);
755 functor_finished.Set();
756 };
deadbeef162cb532017-02-23 17:10:07 -0800757 AsyncInvoker invoker;
deadbeef3af63b02017-08-08 17:59:47 -0700758 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), functor);
deadbeef162cb532017-02-23 17:10:07 -0800759 functor_started.Wait(Event::kForever);
deadbeefaea92932017-05-23 12:55:03 -0700760
deadbeef3af63b02017-08-08 17:59:47 -0700761 // Destroy the invoker while the functor is still executing (doing
762 // SleepMs).
deadbeefaea92932017-05-23 12:55:03 -0700763 functor_continue.Set();
deadbeef162cb532017-02-23 17:10:07 -0800764 }
765
766 // If the destructor DIDN'T wait for the functor to finish executing, it will
767 // hit the EXPECT_FALSE(invoker_destroyed) after it finishes sleeping for a
768 // second.
769 invoker_destroyed = true;
770 functor_finished.Wait(Event::kForever);
771}
772
deadbeef3af63b02017-08-08 17:59:47 -0700773// Variant of the above test where the async-invoked task calls AsyncInvoke
774// *again*, for the thread on which the AsyncInvoker is currently being
775// destroyed. This shouldn't deadlock or crash; this second invocation should
776// just be ignored.
777TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) {
Niels Möllerc572ff32018-11-07 08:43:50 +0100778 Event functor_started;
deadbeef3af63b02017-08-08 17:59:47 -0700779 // Flag used to verify that the recursively invoked task never actually runs.
780 bool reentrant_functor_run = false;
781
782 Thread* main = Thread::Current();
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200783 Thread thread(std::make_unique<NullSocketServer>());
deadbeef3af63b02017-08-08 17:59:47 -0700784 thread.Start();
785 {
786 AsyncInvoker invoker;
787 auto reentrant_functor = [&reentrant_functor_run] {
788 reentrant_functor_run = true;
789 };
790 auto functor = [&functor_started, &invoker, main, reentrant_functor] {
791 functor_started.Set();
792 Thread::Current()->SleepMs(kWaitTimeout);
793 invoker.AsyncInvoke<void>(RTC_FROM_HERE, main, reentrant_functor);
794 };
795 // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a
796 // task on |main|. But this second queued task should never run, since the
797 // destructor will be entered before it's even invoked.
798 invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
799 functor_started.Wait(Event::kForever);
800 }
801 EXPECT_FALSE(reentrant_functor_run);
802}
803
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000804TEST_F(AsyncInvokeTest, Flush) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000805 AsyncInvoker invoker;
nissed9b75be2015-11-16 00:54:07 -0800806 AtomicBool flag1;
807 AtomicBool flag2;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000808 // Queue two async calls to the current thread.
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700809 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag1));
810 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag2));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000811 // Because we haven't pumped messages, these should not have run yet.
nissed9b75be2015-11-16 00:54:07 -0800812 EXPECT_FALSE(flag1.get());
813 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000814 // Force them to run now.
815 invoker.Flush(Thread::Current());
nissed9b75be2015-11-16 00:54:07 -0800816 EXPECT_TRUE(flag1.get());
817 EXPECT_TRUE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000818}
819
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000820TEST_F(AsyncInvokeTest, FlushWithIds) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000821 AsyncInvoker invoker;
nissed9b75be2015-11-16 00:54:07 -0800822 AtomicBool flag1;
823 AtomicBool flag2;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000824 // Queue two async calls to the current thread, one with a message id.
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700825 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag1),
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000826 5);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700827 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag2));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000828 // Because we haven't pumped messages, these should not have run yet.
nissed9b75be2015-11-16 00:54:07 -0800829 EXPECT_FALSE(flag1.get());
830 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000831 // Execute pending calls with id == 5.
832 invoker.Flush(Thread::Current(), 5);
nissed9b75be2015-11-16 00:54:07 -0800833 EXPECT_TRUE(flag1.get());
834 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000835 flag1 = false;
836 // Execute all pending calls. The id == 5 call should not execute again.
837 invoker.Flush(Thread::Current());
nissed9b75be2015-11-16 00:54:07 -0800838 EXPECT_FALSE(flag1.get());
839 EXPECT_TRUE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000840}
841
Henrik Boströmba4dcc32019-02-28 09:34:06 +0100842void ThreadIsCurrent(Thread* thread, bool* result, Event* event) {
843 *result = thread->IsCurrent();
844 event->Set();
845}
846
847void WaitAndSetEvent(Event* wait_event, Event* set_event) {
848 wait_event->Wait(Event::kForever);
849 set_event->Set();
850}
851
852// A functor that keeps track of the number of copies and moves.
853class LifeCycleFunctor {
854 public:
855 struct Stats {
856 size_t copy_count = 0;
857 size_t move_count = 0;
858 };
859
860 LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {}
861 LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; }
862 LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); }
863
864 LifeCycleFunctor& operator=(const LifeCycleFunctor& other) {
865 stats_ = other.stats_;
866 event_ = other.event_;
867 ++stats_->copy_count;
868 return *this;
869 }
870
871 LifeCycleFunctor& operator=(LifeCycleFunctor&& other) {
872 stats_ = other.stats_;
873 event_ = other.event_;
874 ++stats_->move_count;
875 return *this;
876 }
877
878 void operator()() { event_->Set(); }
879
880 private:
881 Stats* stats_;
882 Event* event_;
883};
884
885// A functor that verifies the thread it was destroyed on.
886class DestructionFunctor {
887 public:
888 DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event)
889 : thread_(thread),
890 thread_was_current_(thread_was_current),
891 event_(event) {}
892 ~DestructionFunctor() {
893 // Only signal the event if this was the functor that was invoked to avoid
894 // the event being signaled due to the destruction of temporary/moved
895 // versions of this object.
896 if (was_invoked_) {
897 *thread_was_current_ = thread_->IsCurrent();
898 event_->Set();
899 }
900 }
901
902 void operator()() { was_invoked_ = true; }
903
904 private:
905 Thread* thread_;
906 bool* thread_was_current_;
907 Event* event_;
908 bool was_invoked_ = false;
909};
910
911TEST(ThreadPostTaskTest, InvokesWithBind) {
912 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
913 background_thread->Start();
914
915 Event event;
916 background_thread->PostTask(RTC_FROM_HERE, Bind(&Event::Set, &event));
917 event.Wait(Event::kForever);
918}
919
920TEST(ThreadPostTaskTest, InvokesWithLambda) {
921 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
922 background_thread->Start();
923
924 Event event;
925 background_thread->PostTask(RTC_FROM_HERE, [&event] { event.Set(); });
926 event.Wait(Event::kForever);
927}
928
929TEST(ThreadPostTaskTest, InvokesWithCopiedFunctor) {
930 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
931 background_thread->Start();
932
933 LifeCycleFunctor::Stats stats;
934 Event event;
935 LifeCycleFunctor functor(&stats, &event);
936 background_thread->PostTask(RTC_FROM_HERE, functor);
937 event.Wait(Event::kForever);
938
939 EXPECT_EQ(1u, stats.copy_count);
940 EXPECT_EQ(0u, stats.move_count);
941}
942
943TEST(ThreadPostTaskTest, InvokesWithMovedFunctor) {
944 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
945 background_thread->Start();
946
947 LifeCycleFunctor::Stats stats;
948 Event event;
949 LifeCycleFunctor functor(&stats, &event);
950 background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
951 event.Wait(Event::kForever);
952
953 EXPECT_EQ(0u, stats.copy_count);
954 EXPECT_EQ(1u, stats.move_count);
955}
956
957TEST(ThreadPostTaskTest, InvokesWithReferencedFunctorShouldCopy) {
958 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
959 background_thread->Start();
960
961 LifeCycleFunctor::Stats stats;
962 Event event;
963 LifeCycleFunctor functor(&stats, &event);
964 LifeCycleFunctor& functor_ref = functor;
965 background_thread->PostTask(RTC_FROM_HERE, functor_ref);
966 event.Wait(Event::kForever);
967
968 EXPECT_EQ(1u, stats.copy_count);
969 EXPECT_EQ(0u, stats.move_count);
970}
971
972TEST(ThreadPostTaskTest, InvokesWithCopiedFunctorDestroyedOnTargetThread) {
973 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
974 background_thread->Start();
975
976 Event event;
977 bool was_invoked_on_background_thread = false;
978 DestructionFunctor functor(background_thread.get(),
979 &was_invoked_on_background_thread, &event);
980 background_thread->PostTask(RTC_FROM_HERE, functor);
981 event.Wait(Event::kForever);
982
983 EXPECT_TRUE(was_invoked_on_background_thread);
984}
985
986TEST(ThreadPostTaskTest, InvokesWithMovedFunctorDestroyedOnTargetThread) {
987 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
988 background_thread->Start();
989
990 Event event;
991 bool was_invoked_on_background_thread = false;
992 DestructionFunctor functor(background_thread.get(),
993 &was_invoked_on_background_thread, &event);
994 background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
995 event.Wait(Event::kForever);
996
997 EXPECT_TRUE(was_invoked_on_background_thread);
998}
999
1000TEST(ThreadPostTaskTest,
1001 InvokesWithReferencedFunctorShouldCopyAndDestroyedOnTargetThread) {
1002 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1003 background_thread->Start();
1004
1005 Event event;
1006 bool was_invoked_on_background_thread = false;
1007 DestructionFunctor functor(background_thread.get(),
1008 &was_invoked_on_background_thread, &event);
1009 DestructionFunctor& functor_ref = functor;
1010 background_thread->PostTask(RTC_FROM_HERE, functor_ref);
1011 event.Wait(Event::kForever);
1012
1013 EXPECT_TRUE(was_invoked_on_background_thread);
1014}
1015
1016TEST(ThreadPostTaskTest, InvokesOnBackgroundThread) {
1017 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1018 background_thread->Start();
1019
1020 Event event;
1021 bool was_invoked_on_background_thread = false;
1022 background_thread->PostTask(RTC_FROM_HERE,
1023 Bind(&ThreadIsCurrent, background_thread.get(),
1024 &was_invoked_on_background_thread, &event));
1025 event.Wait(Event::kForever);
1026
1027 EXPECT_TRUE(was_invoked_on_background_thread);
1028}
1029
1030TEST(ThreadPostTaskTest, InvokesAsynchronously) {
1031 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1032 background_thread->Start();
1033
1034 // The first event ensures that SendSingleMessage() is not blocking this
1035 // thread. The second event ensures that the message is processed.
1036 Event event_set_by_test_thread;
1037 Event event_set_by_background_thread;
1038 background_thread->PostTask(RTC_FROM_HERE,
1039 Bind(&WaitAndSetEvent, &event_set_by_test_thread,
1040 &event_set_by_background_thread));
1041 event_set_by_test_thread.Set();
1042 event_set_by_background_thread.Wait(Event::kForever);
1043}
1044
1045TEST(ThreadPostTaskTest, InvokesInPostedOrder) {
1046 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1047 background_thread->Start();
1048
1049 Event first;
1050 Event second;
1051 Event third;
1052 Event fourth;
1053
1054 background_thread->PostTask(RTC_FROM_HERE,
1055 Bind(&WaitAndSetEvent, &first, &second));
1056 background_thread->PostTask(RTC_FROM_HERE,
1057 Bind(&WaitAndSetEvent, &second, &third));
1058 background_thread->PostTask(RTC_FROM_HERE,
1059 Bind(&WaitAndSetEvent, &third, &fourth));
1060
1061 // All tasks have been posted before the first one is unblocked.
1062 first.Set();
1063 // Only if the chain is invoked in posted order will the last event be set.
1064 fourth.Wait(Event::kForever);
1065}
1066
Steve Antonbcc1a762019-12-11 11:21:53 -08001067TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) {
1068 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1069 background_thread->Start();
1070
1071 // The first event ensures that SendSingleMessage() is not blocking this
1072 // thread. The second event ensures that the message is processed.
1073 Event event_set_by_test_thread;
1074 Event event_set_by_background_thread;
1075 background_thread->PostDelayedTask(
1076 RTC_FROM_HERE,
1077 Bind(&WaitAndSetEvent, &event_set_by_test_thread,
1078 &event_set_by_background_thread),
1079 /*milliseconds=*/10);
1080 event_set_by_test_thread.Set();
1081 event_set_by_background_thread.Wait(Event::kForever);
1082}
1083
1084TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) {
Steve Anton094396f2019-12-16 00:56:02 -08001085 ScopedFakeClock clock;
Steve Antonbcc1a762019-12-11 11:21:53 -08001086 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1087 background_thread->Start();
1088
1089 Event first;
1090 Event second;
1091 Event third;
1092 Event fourth;
1093
1094 background_thread->PostDelayedTask(RTC_FROM_HERE,
1095 Bind(&WaitAndSetEvent, &third, &fourth),
1096 /*milliseconds=*/11);
1097 background_thread->PostDelayedTask(RTC_FROM_HERE,
1098 Bind(&WaitAndSetEvent, &first, &second),
1099 /*milliseconds=*/9);
1100 background_thread->PostDelayedTask(RTC_FROM_HERE,
1101 Bind(&WaitAndSetEvent, &second, &third),
1102 /*milliseconds=*/10);
1103
1104 // All tasks have been posted before the first one is unblocked.
1105 first.Set();
Steve Anton094396f2019-12-16 00:56:02 -08001106 // Only if the chain is invoked in delay order will the last event be set.
Danil Chapovalov0c626af2020-02-10 11:16:00 +01001107 clock.AdvanceTime(webrtc::TimeDelta::Millis(11));
Steve Anton094396f2019-12-16 00:56:02 -08001108 EXPECT_TRUE(fourth.Wait(0));
Steve Antonbcc1a762019-12-11 11:21:53 -08001109}
1110
Tommi6866dc72020-05-15 10:11:56 +02001111TEST(ThreadPostDelayedTaskTest, IsCurrentTaskQueue) {
1112 auto current_tq = webrtc::TaskQueueBase::Current();
1113 {
1114 std::unique_ptr<rtc::Thread> thread(rtc::Thread::Create());
1115 thread->WrapCurrent();
1116 EXPECT_EQ(webrtc::TaskQueueBase::Current(),
1117 static_cast<webrtc::TaskQueueBase*>(thread.get()));
1118 thread->UnwrapCurrent();
1119 }
1120 EXPECT_EQ(webrtc::TaskQueueBase::Current(), current_tq);
1121}
1122
Danil Chapovalov912b3b82019-11-22 15:52:40 +01001123class ThreadFactory : public webrtc::TaskQueueFactory {
1124 public:
1125 std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
1126 CreateTaskQueue(absl::string_view /* name */,
1127 Priority /*priority*/) const override {
1128 std::unique_ptr<Thread> thread = Thread::Create();
1129 thread->Start();
1130 return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
1131 thread.release());
1132 }
1133};
1134
1135using ::webrtc::TaskQueueTest;
1136
1137INSTANTIATE_TEST_SUITE_P(RtcThread,
1138 TaskQueueTest,
1139 ::testing::Values(std::make_unique<ThreadFactory>));
1140
Mirko Bonadeie10b1632018-12-11 18:43:40 +01001141} // namespace
1142} // namespace rtc