blob: 86e429e72f8fd5f3bdeadd1b3559035151307b51 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Jonas Olssona4d87372019-07-05 19:08:33 +020011#include "rtc_base/thread.h"
12
kwibergbfefb032016-05-01 14:53:46 -070013#include <memory>
14
Danil Chapovalov912b3b82019-11-22 15:52:40 +010015#include "api/task_queue/task_queue_factory.h"
16#include "api/task_queue/task_queue_test.h"
Steve Anton10542f22019-01-11 09:11:00 -080017#include "rtc_base/async_invoker.h"
18#include "rtc_base/async_udp_socket.h"
Sebastian Jansson73387822020-01-16 11:15:35 +010019#include "rtc_base/atomic_ops.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020020#include "rtc_base/event.h"
21#include "rtc_base/gunit.h"
Mirko Bonadeie5f4c6b2021-01-15 10:41:01 +010022#include "rtc_base/internal/default_socket_server.h"
Steve Anton10542f22019-01-11 09:11:00 -080023#include "rtc_base/null_socket_server.h"
24#include "rtc_base/physical_socket_server.h"
25#include "rtc_base/socket_address.h"
Markus Handell4ab7dde2020-07-10 13:23:25 +020026#include "rtc_base/synchronization/mutex.h"
Sebastian Jansson73387822020-01-16 11:15:35 +010027#include "rtc_base/task_utils/to_queued_task.h"
Artem Titove41c4332018-07-25 15:04:28 +020028#include "rtc_base/third_party/sigslot/sigslot.h"
Sebastian Janssonda7267a2020-03-03 10:48:05 +010029#include "test/testsupport/rtc_expect_death.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000030
31#if defined(WEBRTC_WIN)
32#include <comdef.h> // NOLINT
Markus Handell4ab7dde2020-07-10 13:23:25 +020033
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000034#endif
35
Mirko Bonadeie10b1632018-12-11 18:43:40 +010036namespace rtc {
37namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000038
Sebastian Jansson73387822020-01-16 11:15:35 +010039using ::webrtc::ToQueuedTask;
40
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000041// Generates a sequence of numbers (collaboratively).
42class TestGenerator {
43 public:
44 TestGenerator() : last(0), count(0) {}
45
46 int Next(int prev) {
47 int result = prev + last;
48 last = result;
49 count += 1;
50 return result;
51 }
52
53 int last;
54 int count;
55};
56
57struct TestMessage : public MessageData {
58 explicit TestMessage(int v) : value(v) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000059
60 int value;
61};
62
63// Receives on a socket and sends by posting messages.
64class SocketClient : public TestGenerator, public sigslot::has_slots<> {
65 public:
Yves Gerey665174f2018-06-19 15:03:05 +020066 SocketClient(AsyncSocket* socket,
67 const SocketAddress& addr,
68 Thread* post_thread,
69 MessageHandler* phandler)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000070 : socket_(AsyncUDPSocket::Create(socket, addr)),
71 post_thread_(post_thread),
72 post_handler_(phandler) {
73 socket_->SignalReadPacket.connect(this, &SocketClient::OnPacket);
74 }
75
Steve Anton9de3aac2017-10-24 10:08:26 -070076 ~SocketClient() override { delete socket_; }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077
78 SocketAddress address() const { return socket_->GetLocalAddress(); }
79
Yves Gerey665174f2018-06-19 15:03:05 +020080 void OnPacket(AsyncPacketSocket* socket,
81 const char* buf,
82 size_t size,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000083 const SocketAddress& remote_addr,
Niels Möllere6933812018-11-05 13:01:41 +010084 const int64_t& packet_time_us) {
Peter Boström0c4e06b2015-10-07 12:23:21 +020085 EXPECT_EQ(size, sizeof(uint32_t));
86 uint32_t prev = reinterpret_cast<const uint32_t*>(buf)[0];
87 uint32_t result = Next(prev);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000088
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070089 post_thread_->PostDelayed(RTC_FROM_HERE, 200, post_handler_, 0,
90 new TestMessage(result));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000091 }
92
93 private:
94 AsyncUDPSocket* socket_;
95 Thread* post_thread_;
96 MessageHandler* post_handler_;
97};
98
99// Receives messages and sends on a socket.
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200100class MessageClient : public MessageHandlerAutoCleanup, public TestGenerator {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000101 public:
Yves Gerey665174f2018-06-19 15:03:05 +0200102 MessageClient(Thread* pth, Socket* socket) : socket_(socket) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000103
Steve Anton9de3aac2017-10-24 10:08:26 -0700104 ~MessageClient() override { delete socket_; }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000105
Steve Anton9de3aac2017-10-24 10:08:26 -0700106 void OnMessage(Message* pmsg) override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000107 TestMessage* msg = static_cast<TestMessage*>(pmsg->pdata);
108 int result = Next(msg->value);
109 EXPECT_GE(socket_->Send(&result, sizeof(result)), 0);
110 delete msg;
111 }
112
113 private:
114 Socket* socket_;
115};
116
deadbeefaea92932017-05-23 12:55:03 -0700117class CustomThread : public rtc::Thread {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000118 public:
tommie7251592017-07-14 14:44:46 -0700119 CustomThread()
120 : Thread(std::unique_ptr<SocketServer>(new rtc::NullSocketServer())) {}
Steve Anton9de3aac2017-10-24 10:08:26 -0700121 ~CustomThread() override { Stop(); }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000122 bool Start() { return false; }
jiayl@webrtc.orgba737cb2014-09-18 16:45:21 +0000123
Yves Gerey665174f2018-06-19 15:03:05 +0200124 bool WrapCurrent() { return Thread::WrapCurrent(); }
125 void UnwrapCurrent() { Thread::UnwrapCurrent(); }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000126};
127
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000128// A thread that does nothing when it runs and signals an event
129// when it is destroyed.
130class SignalWhenDestroyedThread : public Thread {
131 public:
132 SignalWhenDestroyedThread(Event* event)
tommie7251592017-07-14 14:44:46 -0700133 : Thread(std::unique_ptr<SocketServer>(new NullSocketServer())),
134 event_(event) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000135
Steve Anton9de3aac2017-10-24 10:08:26 -0700136 ~SignalWhenDestroyedThread() override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000137 Stop();
138 event_->Set();
139 }
140
Steve Anton9de3aac2017-10-24 10:08:26 -0700141 void Run() override {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000142 // Do nothing.
143 }
144
145 private:
146 Event* event_;
147};
148
nissed9b75be2015-11-16 00:54:07 -0800149// A bool wrapped in a mutex, to avoid data races. Using a volatile
150// bool should be sufficient for correct code ("eventual consistency"
151// between caches is sufficient), but we can't tell the compiler about
152// that, and then tsan complains about a data race.
153
154// See also discussion at
155// http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads
156
157// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
158// the right thing to do, but those features are not yet allowed. Or
deadbeefaea92932017-05-23 12:55:03 -0700159// rtc::AtomicInt, if/when that is added. Since the use isn't
nissed9b75be2015-11-16 00:54:07 -0800160// performance critical, use a plain critical section for the time
161// being.
162
163class AtomicBool {
164 public:
165 explicit AtomicBool(bool value = false) : flag_(value) {}
166 AtomicBool& operator=(bool value) {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200167 webrtc::MutexLock scoped_lock(&mutex_);
nissed9b75be2015-11-16 00:54:07 -0800168 flag_ = value;
169 return *this;
170 }
171 bool get() const {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200172 webrtc::MutexLock scoped_lock(&mutex_);
nissed9b75be2015-11-16 00:54:07 -0800173 return flag_;
174 }
175
176 private:
Markus Handell4ab7dde2020-07-10 13:23:25 +0200177 mutable webrtc::Mutex mutex_;
nissed9b75be2015-11-16 00:54:07 -0800178 bool flag_;
179};
180
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000181// Function objects to test Thread::Invoke.
182struct FunctorA {
183 int operator()() { return 42; }
184};
185class FunctorB {
186 public:
nissed9b75be2015-11-16 00:54:07 -0800187 explicit FunctorB(AtomicBool* flag) : flag_(flag) {}
Yves Gerey665174f2018-06-19 15:03:05 +0200188 void operator()() {
189 if (flag_)
190 *flag_ = true;
191 }
192
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000193 private:
nissed9b75be2015-11-16 00:54:07 -0800194 AtomicBool* flag_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000195};
196struct FunctorC {
197 int operator()() {
198 Thread::Current()->ProcessMessages(50);
199 return 24;
200 }
201};
Cameron Pickettd132ce12018-03-12 16:07:37 -0700202struct FunctorD {
203 public:
204 explicit FunctorD(AtomicBool* flag) : flag_(flag) {}
205 FunctorD(FunctorD&&) = default;
206 FunctorD& operator=(FunctorD&&) = default;
Yves Gerey665174f2018-06-19 15:03:05 +0200207 void operator()() {
208 if (flag_)
209 *flag_ = true;
210 }
211
Cameron Pickettd132ce12018-03-12 16:07:37 -0700212 private:
213 AtomicBool* flag_;
214 RTC_DISALLOW_COPY_AND_ASSIGN(FunctorD);
215};
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000216
217// See: https://code.google.com/p/webrtc/issues/detail?id=2409
218TEST(ThreadTest, DISABLED_Main) {
219 const SocketAddress addr("127.0.0.1", 0);
220
221 // Create the messaging client on its own thread.
tommie7251592017-07-14 14:44:46 -0700222 auto th1 = Thread::CreateWithSocketServer();
223 Socket* socket =
224 th1->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
225 MessageClient msg_client(th1.get(), socket);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000226
227 // Create the socket client on its own thread.
tommie7251592017-07-14 14:44:46 -0700228 auto th2 = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000229 AsyncSocket* asocket =
tommie7251592017-07-14 14:44:46 -0700230 th2->socketserver()->CreateAsyncSocket(addr.family(), SOCK_DGRAM);
231 SocketClient sock_client(asocket, addr, th1.get(), &msg_client);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000232
233 socket->Connect(sock_client.address());
234
tommie7251592017-07-14 14:44:46 -0700235 th1->Start();
236 th2->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000237
238 // Get the messages started.
tommie7251592017-07-14 14:44:46 -0700239 th1->PostDelayed(RTC_FROM_HERE, 100, &msg_client, 0, new TestMessage(1));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000240
241 // Give the clients a little while to run.
242 // Messages will be processed at 100, 300, 500, 700, 900.
243 Thread* th_main = Thread::Current();
244 th_main->ProcessMessages(1000);
245
246 // Stop the sending client. Give the receiver a bit longer to run, in case
247 // it is running on a machine that is under load (e.g. the build machine).
tommie7251592017-07-14 14:44:46 -0700248 th1->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000249 th_main->ProcessMessages(200);
tommie7251592017-07-14 14:44:46 -0700250 th2->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000251
252 // Make sure the results were correct
253 EXPECT_EQ(5, msg_client.count);
254 EXPECT_EQ(34, msg_client.last);
255 EXPECT_EQ(5, sock_client.count);
256 EXPECT_EQ(55, sock_client.last);
257}
258
Tommife041642021-04-07 10:08:28 +0200259TEST(ThreadTest, CountBlockingCalls) {
260 // When the test runs, this will print out:
261 // (thread_unittest.cc:262): Blocking TestBody: total=2 (actual=1, could=1)
262 RTC_LOG_THREAD_BLOCK_COUNT();
263#if RTC_DCHECK_IS_ON
264 rtc::Thread* current = rtc::Thread::Current();
265 ASSERT_TRUE(current);
266 rtc::Thread::ScopedCountBlockingCalls blocked_calls(
267 [&](uint32_t actual_block, uint32_t could_block) {
268 EXPECT_EQ(1u, actual_block);
269 EXPECT_EQ(1u, could_block);
270 });
271
272 EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount());
273 EXPECT_EQ(0u, blocked_calls.GetCouldBeBlockingCallCount());
274 EXPECT_EQ(0u, blocked_calls.GetTotalBlockedCallCount());
275
276 // Test invoking on the current thread. This should not count as an 'actual'
277 // invoke, but should still count as an invoke that could block since we
278 // that the call to Invoke serves a purpose in some configurations (and should
279 // not be used a general way to call methods on the same thread).
280 current->Invoke<void>(RTC_FROM_HERE, []() {});
281 EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount());
282 EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
283 EXPECT_EQ(1u, blocked_calls.GetTotalBlockedCallCount());
284
285 // Create a new thread to invoke on.
286 auto thread = Thread::CreateWithSocketServer();
287 thread->Start();
288 EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, []() { return 42; }));
289 EXPECT_EQ(1u, blocked_calls.GetBlockingCallCount());
290 EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
291 EXPECT_EQ(2u, blocked_calls.GetTotalBlockedCallCount());
292 thread->Stop();
293 RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(2);
294#else
295 RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0);
296 RTC_LOG(LS_INFO) << "Test not active in this config";
297#endif
298}
299
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000300// Test that setting thread names doesn't cause a malfunction.
301// There's no easy way to verify the name was set properly at this time.
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000302TEST(ThreadTest, Names) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000303 // Default name
tommie7251592017-07-14 14:44:46 -0700304 auto thread = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000305 EXPECT_TRUE(thread->Start());
306 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000307 // Name with no object parameter
tommie7251592017-07-14 14:44:46 -0700308 thread = Thread::CreateWithSocketServer();
deadbeef37f5ecf2017-02-27 14:06:41 -0800309 EXPECT_TRUE(thread->SetName("No object", nullptr));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000310 EXPECT_TRUE(thread->Start());
311 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000312 // Really long name
tommie7251592017-07-14 14:44:46 -0700313 thread = Thread::CreateWithSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000314 EXPECT_TRUE(thread->SetName("Abcdefghijklmnopqrstuvwxyz1234567890", this));
315 EXPECT_TRUE(thread->Start());
316 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000317}
318
henrike@webrtc.orge30dab72014-10-09 15:41:40 +0000319TEST(ThreadTest, Wrap) {
320 Thread* current_thread = Thread::Current();
Niels Möller5a8f8602019-06-12 11:30:59 +0200321 ThreadManager::Instance()->SetCurrentThread(nullptr);
322
323 {
324 CustomThread cthread;
325 EXPECT_TRUE(cthread.WrapCurrent());
326 EXPECT_EQ(&cthread, Thread::Current());
327 EXPECT_TRUE(cthread.RunningForTest());
328 EXPECT_FALSE(cthread.IsOwned());
329 cthread.UnwrapCurrent();
330 EXPECT_FALSE(cthread.RunningForTest());
331 }
332 ThreadManager::Instance()->SetCurrentThread(current_thread);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333}
334
Artem Titovdfc5f0d2020-07-03 12:09:26 +0200335#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
336TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
337 // Create and start the thread.
338 auto thread1 = Thread::CreateWithSocketServer();
339 auto thread2 = Thread::CreateWithSocketServer();
340
341 thread1->PostTask(ToQueuedTask(
342 [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
343 Thread* th_main = Thread::Current();
344 th_main->ProcessMessages(100);
345}
346
347TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
348 // Create and start the thread.
349 auto thread1 = Thread::CreateWithSocketServer();
350 auto thread2 = Thread::CreateWithSocketServer();
351 auto thread3 = Thread::CreateWithSocketServer();
352 auto thread4 = Thread::CreateWithSocketServer();
353
354 thread1->AllowInvokesToThread(thread2.get());
355 thread1->AllowInvokesToThread(thread3.get());
356
357 thread1->PostTask(ToQueuedTask([&]() {
358 EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
359 EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
360 EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
361 }));
362 Thread* th_main = Thread::Current();
363 th_main->ProcessMessages(100);
364}
365
366TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) {
367 // Create and start the thread.
368 auto thread1 = Thread::CreateWithSocketServer();
369 auto thread2 = Thread::CreateWithSocketServer();
370
371 thread1->DisallowAllInvokes();
372
373 thread1->PostTask(ToQueuedTask([&]() {
374 EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
375 }));
376 Thread* th_main = Thread::Current();
377 th_main->ProcessMessages(100);
378}
379#endif // (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
380
381TEST(ThreadTest, InvokesAllowedByDefault) {
382 // Create and start the thread.
383 auto thread1 = Thread::CreateWithSocketServer();
384 auto thread2 = Thread::CreateWithSocketServer();
385
386 thread1->PostTask(ToQueuedTask(
387 [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
388 Thread* th_main = Thread::Current();
389 th_main->ProcessMessages(100);
390}
391
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000392TEST(ThreadTest, Invoke) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000393 // Create and start the thread.
tommie7251592017-07-14 14:44:46 -0700394 auto thread = Thread::CreateWithSocketServer();
395 thread->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000396 // Try calling functors.
tommie7251592017-07-14 14:44:46 -0700397 EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, FunctorA()));
nissed9b75be2015-11-16 00:54:07 -0800398 AtomicBool called;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000399 FunctorB f2(&called);
tommie7251592017-07-14 14:44:46 -0700400 thread->Invoke<void>(RTC_FROM_HERE, f2);
nissed9b75be2015-11-16 00:54:07 -0800401 EXPECT_TRUE(called.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000402 // Try calling bare functions.
403 struct LocalFuncs {
404 static int Func1() { return 999; }
405 static void Func2() {}
406 };
tommie7251592017-07-14 14:44:46 -0700407 EXPECT_EQ(999, thread->Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1));
408 thread->Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000409}
410
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000411// Verifies that two threads calling Invoke on each other at the same time does
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100412// not deadlock but crash.
413#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
414TEST(ThreadTest, TwoThreadsInvokeDeathTest) {
415 ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000416 AutoThread thread;
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100417 Thread* main_thread = Thread::Current();
tommie7251592017-07-14 14:44:46 -0700418 auto other_thread = Thread::CreateWithSocketServer();
419 other_thread->Start();
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100420 other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
421 RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
422 });
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000423}
424
Sebastian Janssonda7267a2020-03-03 10:48:05 +0100425TEST(ThreadTest, ThreeThreadsInvokeDeathTest) {
426 ::testing::GTEST_FLAG(death_test_style) = "threadsafe";
427 AutoThread thread;
428 Thread* first = Thread::Current();
429
430 auto second = Thread::Create();
431 second->Start();
432 auto third = Thread::Create();
433 third->Start();
434
435 second->Invoke<void>(RTC_FROM_HERE, [&] {
436 third->Invoke<void>(RTC_FROM_HERE, [&] {
437 RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
438 });
439 });
440}
441
442#endif
443
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000444// Verifies that if thread A invokes a call on thread B and thread C is trying
445// to invoke A at the same time, thread A does not handle C's invoke while
446// invoking B.
447TEST(ThreadTest, ThreeThreadsInvoke) {
448 AutoThread thread;
449 Thread* thread_a = Thread::Current();
tommie7251592017-07-14 14:44:46 -0700450 auto thread_b = Thread::CreateWithSocketServer();
451 auto thread_c = Thread::CreateWithSocketServer();
452 thread_b->Start();
453 thread_c->Start();
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000454
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000455 class LockedBool {
456 public:
457 explicit LockedBool(bool value) : value_(value) {}
458
459 void Set(bool value) {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200460 webrtc::MutexLock lock(&mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000461 value_ = value;
462 }
463
464 bool Get() {
Markus Handell4ab7dde2020-07-10 13:23:25 +0200465 webrtc::MutexLock lock(&mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000466 return value_;
467 }
468
469 private:
Markus Handell4ab7dde2020-07-10 13:23:25 +0200470 webrtc::Mutex mutex_;
471 bool value_ RTC_GUARDED_BY(mutex_);
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000472 };
473
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000474 struct LocalFuncs {
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000475 static void Set(LockedBool* out) { out->Set(true); }
476 static void InvokeSet(Thread* thread, LockedBool* out) {
Niels Möller1a29a5d2021-01-18 11:35:23 +0100477 thread->Invoke<void>(RTC_FROM_HERE, [out] { Set(out); });
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000478 }
479
480 // Set |out| true and call InvokeSet on |thread|.
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000481 static void SetAndInvokeSet(LockedBool* out,
482 Thread* thread,
483 LockedBool* out_inner) {
484 out->Set(true);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000485 InvokeSet(thread, out_inner);
486 }
487
488 // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
489 // |thread1| starts the call.
deadbeef162cb532017-02-23 17:10:07 -0800490 static void AsyncInvokeSetAndWait(AsyncInvoker* invoker,
491 Thread* thread1,
492 Thread* thread2,
493 LockedBool* out) {
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000494 LockedBool async_invoked(false);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000495
deadbeef162cb532017-02-23 17:10:07 -0800496 invoker->AsyncInvoke<void>(
Niels Möller1a29a5d2021-01-18 11:35:23 +0100497 RTC_FROM_HERE, thread1, [&async_invoked, thread2, out] {
498 SetAndInvokeSet(&async_invoked, thread2, out);
499 });
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000500
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000501 EXPECT_TRUE_WAIT(async_invoked.Get(), 2000);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000502 }
503 };
504
deadbeef162cb532017-02-23 17:10:07 -0800505 AsyncInvoker invoker;
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000506 LockedBool thread_a_called(false);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000507
508 // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
509 // Thread B returns when C receives the call and C should be blocked until A
510 // starts to process messages.
Niels Möller1a29a5d2021-01-18 11:35:23 +0100511 Thread* thread_c_ptr = thread_c.get();
512 thread_b->Invoke<void>(
513 RTC_FROM_HERE, [&invoker, thread_c_ptr, thread_a, &thread_a_called] {
514 LocalFuncs::AsyncInvokeSetAndWait(&invoker, thread_c_ptr, thread_a,
515 &thread_a_called);
516 });
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000517 EXPECT_FALSE(thread_a_called.Get());
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000518
pbos@webrtc.orge93cbd12014-10-15 14:54:56 +0000519 EXPECT_TRUE_WAIT(thread_a_called.Get(), 2000);
jiayl@webrtc.org3987b6d2014-09-24 17:14:05 +0000520}
521
jbauch25d1f282016-02-05 00:25:02 -0800522// Set the name on a thread when the underlying QueueDestroyed signal is
523// triggered. This causes an error if the object is already partially
524// destroyed.
525class SetNameOnSignalQueueDestroyedTester : public sigslot::has_slots<> {
526 public:
527 SetNameOnSignalQueueDestroyedTester(Thread* thread) : thread_(thread) {
528 thread->SignalQueueDestroyed.connect(
529 this, &SetNameOnSignalQueueDestroyedTester::OnQueueDestroyed);
530 }
531
532 void OnQueueDestroyed() {
533 // Makes sure that if we access the Thread while it's being destroyed, that
534 // it doesn't cause a problem because the vtable has been modified.
535 thread_->SetName("foo", nullptr);
536 }
537
538 private:
539 Thread* thread_;
540};
541
542TEST(ThreadTest, SetNameOnSignalQueueDestroyed) {
tommie7251592017-07-14 14:44:46 -0700543 auto thread1 = Thread::CreateWithSocketServer();
544 SetNameOnSignalQueueDestroyedTester tester1(thread1.get());
545 thread1.reset();
jbauch25d1f282016-02-05 00:25:02 -0800546
547 Thread* thread2 = new AutoThread();
548 SetNameOnSignalQueueDestroyedTester tester2(thread2);
549 delete thread2;
jbauch25d1f282016-02-05 00:25:02 -0800550}
551
Sebastian Jansson73387822020-01-16 11:15:35 +0100552class ThreadQueueTest : public ::testing::Test, public Thread {
553 public:
Mirko Bonadeie5f4c6b2021-01-15 10:41:01 +0100554 ThreadQueueTest() : Thread(CreateDefaultSocketServer(), true) {}
Sebastian Jansson73387822020-01-16 11:15:35 +0100555 bool IsLocked_Worker() {
556 if (!CritForTest()->TryEnter()) {
557 return true;
558 }
559 CritForTest()->Leave();
560 return false;
561 }
562 bool IsLocked() {
563 // We have to do this on a worker thread, or else the TryEnter will
564 // succeed, since our critical sections are reentrant.
565 std::unique_ptr<Thread> worker(Thread::CreateWithSocketServer());
566 worker->Start();
Niels Möller1a29a5d2021-01-18 11:35:23 +0100567 return worker->Invoke<bool>(RTC_FROM_HERE,
568 [this] { return IsLocked_Worker(); });
Sebastian Jansson73387822020-01-16 11:15:35 +0100569 }
570};
571
572struct DeletedLockChecker {
573 DeletedLockChecker(ThreadQueueTest* test, bool* was_locked, bool* deleted)
574 : test(test), was_locked(was_locked), deleted(deleted) {}
575 ~DeletedLockChecker() {
576 *deleted = true;
577 *was_locked = test->IsLocked();
578 }
579 ThreadQueueTest* test;
580 bool* was_locked;
581 bool* deleted;
582};
583
584static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
585 EXPECT_TRUE(q != nullptr);
586 int64_t now = TimeMillis();
587 q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
588 q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
589 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
590 q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
591 q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
592
593 Message msg;
594 for (size_t i = 0; i < 5; ++i) {
595 memset(&msg, 0, sizeof(msg));
596 EXPECT_TRUE(q->Get(&msg, 0));
597 EXPECT_EQ(i, msg.message_id);
598 }
599
600 EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
601}
602
603TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
Mirko Bonadeie5f4c6b2021-01-15 10:41:01 +0100604 Thread q(CreateDefaultSocketServer(), true);
Sebastian Jansson73387822020-01-16 11:15:35 +0100605 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
606
607 NullSocketServer nullss;
608 Thread q_nullss(&nullss, true);
609 DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
610}
611
612TEST_F(ThreadQueueTest, DisposeNotLocked) {
613 bool was_locked = true;
614 bool deleted = false;
615 DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
616 Dispose(d);
617 Message msg;
618 EXPECT_FALSE(Get(&msg, 0));
619 EXPECT_TRUE(deleted);
620 EXPECT_FALSE(was_locked);
621}
622
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200623class DeletedMessageHandler : public MessageHandlerAutoCleanup {
Sebastian Jansson73387822020-01-16 11:15:35 +0100624 public:
625 explicit DeletedMessageHandler(bool* deleted) : deleted_(deleted) {}
626 ~DeletedMessageHandler() override { *deleted_ = true; }
627 void OnMessage(Message* msg) override {}
628
629 private:
630 bool* deleted_;
631};
632
633TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) {
634 bool deleted = false;
635 DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
636 // First, post a dispose.
637 Dispose(handler);
638 // Now, post a message, which should *not* be returned by Get().
639 Post(RTC_FROM_HERE, handler, 1);
640 Message msg;
641 EXPECT_FALSE(Get(&msg, 0));
642 EXPECT_TRUE(deleted);
643}
644
645// Ensure that ProcessAllMessageQueues does its essential function; process
646// all messages (both delayed and non delayed) up until the current time, on
647// all registered message queues.
648TEST(ThreadManager, ProcessAllMessageQueues) {
649 Event entered_process_all_message_queues(true, false);
650 auto a = Thread::CreateWithSocketServer();
651 auto b = Thread::CreateWithSocketServer();
652 a->Start();
653 b->Start();
654
655 volatile int messages_processed = 0;
656 auto incrementer = [&messages_processed,
657 &entered_process_all_message_queues] {
658 // Wait for event as a means to ensure Increment doesn't occur outside
659 // of ProcessAllMessageQueues. The event is set by a message posted to
660 // the main thread, which is guaranteed to be handled inside
661 // ProcessAllMessageQueues.
662 entered_process_all_message_queues.Wait(Event::kForever);
663 AtomicOps::Increment(&messages_processed);
664 };
665 auto event_signaler = [&entered_process_all_message_queues] {
666 entered_process_all_message_queues.Set();
667 };
668
669 // Post messages (both delayed and non delayed) to both threads.
670 a->PostTask(ToQueuedTask(incrementer));
671 b->PostTask(ToQueuedTask(incrementer));
672 a->PostDelayedTask(ToQueuedTask(incrementer), 0);
673 b->PostDelayedTask(ToQueuedTask(incrementer), 0);
674 rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler));
675
676 ThreadManager::ProcessAllMessageQueuesForTesting();
677 EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
678}
679
680// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
681TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) {
682 auto t = Thread::CreateWithSocketServer();
683 t->Start();
684 t->Quit();
685 ThreadManager::ProcessAllMessageQueuesForTesting();
686}
687
688// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
689// messages.
690TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) {
691 Event entered_process_all_message_queues(true, false);
692 auto t = Thread::CreateWithSocketServer();
693 t->Start();
694
695 auto clearer = [&entered_process_all_message_queues] {
696 // Wait for event as a means to ensure Clear doesn't occur outside of
697 // ProcessAllMessageQueues. The event is set by a message posted to the
698 // main thread, which is guaranteed to be handled inside
699 // ProcessAllMessageQueues.
700 entered_process_all_message_queues.Wait(Event::kForever);
701 rtc::Thread::Current()->Clear(nullptr);
702 };
703 auto event_signaler = [&entered_process_all_message_queues] {
704 entered_process_all_message_queues.Set();
705 };
706
707 // Post messages (both delayed and non delayed) to both threads.
708 t->PostTask(RTC_FROM_HERE, clearer);
709 rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler);
710 ThreadManager::ProcessAllMessageQueuesForTesting();
711}
712
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200713class RefCountedHandler : public MessageHandlerAutoCleanup,
714 public rtc::RefCountInterface {
Sebastian Jansson73387822020-01-16 11:15:35 +0100715 public:
716 void OnMessage(Message* msg) override {}
717};
718
Tomas Gunnarssonabdb4702020-09-05 18:43:36 +0200719class EmptyHandler : public MessageHandlerAutoCleanup {
Sebastian Jansson73387822020-01-16 11:15:35 +0100720 public:
721 void OnMessage(Message* msg) override {}
722};
723
724TEST(ThreadManager, ClearReentrant) {
725 std::unique_ptr<Thread> t(Thread::Create());
726 EmptyHandler handler;
727 RefCountedHandler* inner_handler(
728 new rtc::RefCountedObject<RefCountedHandler>());
729 // When the empty handler is destroyed, it will clear messages queued for
730 // itself. The message to be cleared itself wraps a MessageHandler object
731 // (RefCountedHandler) so this will cause the message queue to be cleared
732 // again in a re-entrant fashion, which previously triggered a DCHECK.
733 // The inner handler will be removed in a re-entrant fashion from the
734 // message queue of the thread while the outer handler is removed, verifying
735 // that the iterator is not invalidated in "MessageQueue::Clear".
736 t->Post(RTC_FROM_HERE, inner_handler, 0);
737 t->Post(RTC_FROM_HERE, &handler, 0,
738 new ScopedRefMessageData<RefCountedHandler>(inner_handler));
739}
740
Mirko Bonadei6a489f22019-04-09 15:11:12 +0200741class AsyncInvokeTest : public ::testing::Test {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000742 public:
743 void IntCallback(int value) {
744 EXPECT_EQ(expected_thread_, Thread::Current());
745 int_value_ = value;
746 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000747 void SetExpectedThreadForIntCallback(Thread* thread) {
748 expected_thread_ = thread;
749 }
750
751 protected:
752 enum { kWaitTimeout = 1000 };
Yves Gerey665174f2018-06-19 15:03:05 +0200753 AsyncInvokeTest() : int_value_(0), expected_thread_(nullptr) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000754
755 int int_value_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000756 Thread* expected_thread_;
757};
758
henrike@webrtc.orge30dab72014-10-09 15:41:40 +0000759TEST_F(AsyncInvokeTest, FireAndForget) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000760 AsyncInvoker invoker;
761 // Create and start the thread.
tommie7251592017-07-14 14:44:46 -0700762 auto thread = Thread::CreateWithSocketServer();
763 thread->Start();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000764 // Try calling functor.
nissed9b75be2015-11-16 00:54:07 -0800765 AtomicBool called;
tommie7251592017-07-14 14:44:46 -0700766 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), FunctorB(&called));
nissed9b75be2015-11-16 00:54:07 -0800767 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
tommie7251592017-07-14 14:44:46 -0700768 thread->Stop();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000769}
770
Cameron Pickettd132ce12018-03-12 16:07:37 -0700771TEST_F(AsyncInvokeTest, NonCopyableFunctor) {
772 AsyncInvoker invoker;
773 // Create and start the thread.
774 auto thread = Thread::CreateWithSocketServer();
775 thread->Start();
776 // Try calling functor.
777 AtomicBool called;
778 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), FunctorD(&called));
779 EXPECT_TRUE_WAIT(called.get(), kWaitTimeout);
780 thread->Stop();
781}
782
deadbeef162cb532017-02-23 17:10:07 -0800783TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
784 // Use these events to get in a state where the functor is in the middle of
785 // executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE"
786 // is run.
Niels Möllerc572ff32018-11-07 08:43:50 +0100787 Event functor_started;
788 Event functor_continue;
789 Event functor_finished;
deadbeef162cb532017-02-23 17:10:07 -0800790
tommie7251592017-07-14 14:44:46 -0700791 auto thread = Thread::CreateWithSocketServer();
792 thread->Start();
deadbeef162cb532017-02-23 17:10:07 -0800793 volatile bool invoker_destroyed = false;
794 {
deadbeef3af63b02017-08-08 17:59:47 -0700795 auto functor = [&functor_started, &functor_continue, &functor_finished,
796 &invoker_destroyed] {
797 functor_started.Set();
798 functor_continue.Wait(Event::kForever);
799 rtc::Thread::Current()->SleepMs(kWaitTimeout);
800 EXPECT_FALSE(invoker_destroyed);
801 functor_finished.Set();
802 };
deadbeef162cb532017-02-23 17:10:07 -0800803 AsyncInvoker invoker;
deadbeef3af63b02017-08-08 17:59:47 -0700804 invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), functor);
deadbeef162cb532017-02-23 17:10:07 -0800805 functor_started.Wait(Event::kForever);
deadbeefaea92932017-05-23 12:55:03 -0700806
deadbeef3af63b02017-08-08 17:59:47 -0700807 // Destroy the invoker while the functor is still executing (doing
808 // SleepMs).
deadbeefaea92932017-05-23 12:55:03 -0700809 functor_continue.Set();
deadbeef162cb532017-02-23 17:10:07 -0800810 }
811
812 // If the destructor DIDN'T wait for the functor to finish executing, it will
813 // hit the EXPECT_FALSE(invoker_destroyed) after it finishes sleeping for a
814 // second.
815 invoker_destroyed = true;
816 functor_finished.Wait(Event::kForever);
817}
818
deadbeef3af63b02017-08-08 17:59:47 -0700819// Variant of the above test where the async-invoked task calls AsyncInvoke
820// *again*, for the thread on which the AsyncInvoker is currently being
821// destroyed. This shouldn't deadlock or crash; this second invocation should
822// just be ignored.
823TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) {
Niels Möllerc572ff32018-11-07 08:43:50 +0100824 Event functor_started;
deadbeef3af63b02017-08-08 17:59:47 -0700825 // Flag used to verify that the recursively invoked task never actually runs.
826 bool reentrant_functor_run = false;
827
828 Thread* main = Thread::Current();
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200829 Thread thread(std::make_unique<NullSocketServer>());
deadbeef3af63b02017-08-08 17:59:47 -0700830 thread.Start();
831 {
832 AsyncInvoker invoker;
833 auto reentrant_functor = [&reentrant_functor_run] {
834 reentrant_functor_run = true;
835 };
836 auto functor = [&functor_started, &invoker, main, reentrant_functor] {
837 functor_started.Set();
838 Thread::Current()->SleepMs(kWaitTimeout);
839 invoker.AsyncInvoke<void>(RTC_FROM_HERE, main, reentrant_functor);
840 };
841 // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a
842 // task on |main|. But this second queued task should never run, since the
843 // destructor will be entered before it's even invoked.
844 invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
845 functor_started.Wait(Event::kForever);
846 }
847 EXPECT_FALSE(reentrant_functor_run);
848}
849
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000850TEST_F(AsyncInvokeTest, Flush) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000851 AsyncInvoker invoker;
nissed9b75be2015-11-16 00:54:07 -0800852 AtomicBool flag1;
853 AtomicBool flag2;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000854 // Queue two async calls to the current thread.
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700855 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag1));
856 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag2));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000857 // Because we haven't pumped messages, these should not have run yet.
nissed9b75be2015-11-16 00:54:07 -0800858 EXPECT_FALSE(flag1.get());
859 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000860 // Force them to run now.
861 invoker.Flush(Thread::Current());
nissed9b75be2015-11-16 00:54:07 -0800862 EXPECT_TRUE(flag1.get());
863 EXPECT_TRUE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000864}
865
henrike@webrtc.orgc732a3e2014-10-09 22:08:15 +0000866TEST_F(AsyncInvokeTest, FlushWithIds) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000867 AsyncInvoker invoker;
nissed9b75be2015-11-16 00:54:07 -0800868 AtomicBool flag1;
869 AtomicBool flag2;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000870 // Queue two async calls to the current thread, one with a message id.
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700871 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag1),
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000872 5);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700873 invoker.AsyncInvoke<void>(RTC_FROM_HERE, Thread::Current(), FunctorB(&flag2));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000874 // Because we haven't pumped messages, these should not have run yet.
nissed9b75be2015-11-16 00:54:07 -0800875 EXPECT_FALSE(flag1.get());
876 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000877 // Execute pending calls with id == 5.
878 invoker.Flush(Thread::Current(), 5);
nissed9b75be2015-11-16 00:54:07 -0800879 EXPECT_TRUE(flag1.get());
880 EXPECT_FALSE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000881 flag1 = false;
882 // Execute all pending calls. The id == 5 call should not execute again.
883 invoker.Flush(Thread::Current());
nissed9b75be2015-11-16 00:54:07 -0800884 EXPECT_FALSE(flag1.get());
885 EXPECT_TRUE(flag2.get());
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000886}
887
Henrik Boströmba4dcc32019-02-28 09:34:06 +0100888void WaitAndSetEvent(Event* wait_event, Event* set_event) {
889 wait_event->Wait(Event::kForever);
890 set_event->Set();
891}
892
893// A functor that keeps track of the number of copies and moves.
894class LifeCycleFunctor {
895 public:
896 struct Stats {
897 size_t copy_count = 0;
898 size_t move_count = 0;
899 };
900
901 LifeCycleFunctor(Stats* stats, Event* event) : stats_(stats), event_(event) {}
902 LifeCycleFunctor(const LifeCycleFunctor& other) { *this = other; }
903 LifeCycleFunctor(LifeCycleFunctor&& other) { *this = std::move(other); }
904
905 LifeCycleFunctor& operator=(const LifeCycleFunctor& other) {
906 stats_ = other.stats_;
907 event_ = other.event_;
908 ++stats_->copy_count;
909 return *this;
910 }
911
912 LifeCycleFunctor& operator=(LifeCycleFunctor&& other) {
913 stats_ = other.stats_;
914 event_ = other.event_;
915 ++stats_->move_count;
916 return *this;
917 }
918
919 void operator()() { event_->Set(); }
920
921 private:
922 Stats* stats_;
923 Event* event_;
924};
925
926// A functor that verifies the thread it was destroyed on.
927class DestructionFunctor {
928 public:
929 DestructionFunctor(Thread* thread, bool* thread_was_current, Event* event)
930 : thread_(thread),
931 thread_was_current_(thread_was_current),
932 event_(event) {}
933 ~DestructionFunctor() {
934 // Only signal the event if this was the functor that was invoked to avoid
935 // the event being signaled due to the destruction of temporary/moved
936 // versions of this object.
937 if (was_invoked_) {
938 *thread_was_current_ = thread_->IsCurrent();
939 event_->Set();
940 }
941 }
942
943 void operator()() { was_invoked_ = true; }
944
945 private:
946 Thread* thread_;
947 bool* thread_was_current_;
948 Event* event_;
949 bool was_invoked_ = false;
950};
951
Henrik Boströmba4dcc32019-02-28 09:34:06 +0100952TEST(ThreadPostTaskTest, InvokesWithLambda) {
953 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
954 background_thread->Start();
955
956 Event event;
957 background_thread->PostTask(RTC_FROM_HERE, [&event] { event.Set(); });
958 event.Wait(Event::kForever);
959}
960
961TEST(ThreadPostTaskTest, InvokesWithCopiedFunctor) {
962 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
963 background_thread->Start();
964
965 LifeCycleFunctor::Stats stats;
966 Event event;
967 LifeCycleFunctor functor(&stats, &event);
968 background_thread->PostTask(RTC_FROM_HERE, functor);
969 event.Wait(Event::kForever);
970
971 EXPECT_EQ(1u, stats.copy_count);
972 EXPECT_EQ(0u, stats.move_count);
973}
974
975TEST(ThreadPostTaskTest, InvokesWithMovedFunctor) {
976 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
977 background_thread->Start();
978
979 LifeCycleFunctor::Stats stats;
980 Event event;
981 LifeCycleFunctor functor(&stats, &event);
982 background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
983 event.Wait(Event::kForever);
984
985 EXPECT_EQ(0u, stats.copy_count);
986 EXPECT_EQ(1u, stats.move_count);
987}
988
989TEST(ThreadPostTaskTest, InvokesWithReferencedFunctorShouldCopy) {
990 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
991 background_thread->Start();
992
993 LifeCycleFunctor::Stats stats;
994 Event event;
995 LifeCycleFunctor functor(&stats, &event);
996 LifeCycleFunctor& functor_ref = functor;
997 background_thread->PostTask(RTC_FROM_HERE, functor_ref);
998 event.Wait(Event::kForever);
999
1000 EXPECT_EQ(1u, stats.copy_count);
1001 EXPECT_EQ(0u, stats.move_count);
1002}
1003
1004TEST(ThreadPostTaskTest, InvokesWithCopiedFunctorDestroyedOnTargetThread) {
1005 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1006 background_thread->Start();
1007
1008 Event event;
1009 bool was_invoked_on_background_thread = false;
1010 DestructionFunctor functor(background_thread.get(),
1011 &was_invoked_on_background_thread, &event);
1012 background_thread->PostTask(RTC_FROM_HERE, functor);
1013 event.Wait(Event::kForever);
1014
1015 EXPECT_TRUE(was_invoked_on_background_thread);
1016}
1017
1018TEST(ThreadPostTaskTest, InvokesWithMovedFunctorDestroyedOnTargetThread) {
1019 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1020 background_thread->Start();
1021
1022 Event event;
1023 bool was_invoked_on_background_thread = false;
1024 DestructionFunctor functor(background_thread.get(),
1025 &was_invoked_on_background_thread, &event);
1026 background_thread->PostTask(RTC_FROM_HERE, std::move(functor));
1027 event.Wait(Event::kForever);
1028
1029 EXPECT_TRUE(was_invoked_on_background_thread);
1030}
1031
1032TEST(ThreadPostTaskTest,
1033 InvokesWithReferencedFunctorShouldCopyAndDestroyedOnTargetThread) {
1034 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1035 background_thread->Start();
1036
1037 Event event;
1038 bool was_invoked_on_background_thread = false;
1039 DestructionFunctor functor(background_thread.get(),
1040 &was_invoked_on_background_thread, &event);
1041 DestructionFunctor& functor_ref = functor;
1042 background_thread->PostTask(RTC_FROM_HERE, functor_ref);
1043 event.Wait(Event::kForever);
1044
1045 EXPECT_TRUE(was_invoked_on_background_thread);
1046}
1047
1048TEST(ThreadPostTaskTest, InvokesOnBackgroundThread) {
1049 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1050 background_thread->Start();
1051
1052 Event event;
1053 bool was_invoked_on_background_thread = false;
Niels Möller1a29a5d2021-01-18 11:35:23 +01001054 Thread* background_thread_ptr = background_thread.get();
1055 background_thread->PostTask(
1056 RTC_FROM_HERE,
1057 [background_thread_ptr, &was_invoked_on_background_thread, &event] {
1058 was_invoked_on_background_thread = background_thread_ptr->IsCurrent();
1059 event.Set();
1060 });
Henrik Boströmba4dcc32019-02-28 09:34:06 +01001061 event.Wait(Event::kForever);
1062
1063 EXPECT_TRUE(was_invoked_on_background_thread);
1064}
1065
1066TEST(ThreadPostTaskTest, InvokesAsynchronously) {
1067 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1068 background_thread->Start();
1069
1070 // The first event ensures that SendSingleMessage() is not blocking this
1071 // thread. The second event ensures that the message is processed.
1072 Event event_set_by_test_thread;
1073 Event event_set_by_background_thread;
Niels Möller1a29a5d2021-01-18 11:35:23 +01001074 background_thread->PostTask(RTC_FROM_HERE, [&event_set_by_test_thread,
1075 &event_set_by_background_thread] {
1076 WaitAndSetEvent(&event_set_by_test_thread, &event_set_by_background_thread);
1077 });
Henrik Boströmba4dcc32019-02-28 09:34:06 +01001078 event_set_by_test_thread.Set();
1079 event_set_by_background_thread.Wait(Event::kForever);
1080}
1081
1082TEST(ThreadPostTaskTest, InvokesInPostedOrder) {
1083 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1084 background_thread->Start();
1085
1086 Event first;
1087 Event second;
1088 Event third;
1089 Event fourth;
1090
Niels Möller1a29a5d2021-01-18 11:35:23 +01001091 background_thread->PostTask(
1092 RTC_FROM_HERE, [&first, &second] { WaitAndSetEvent(&first, &second); });
1093 background_thread->PostTask(
1094 RTC_FROM_HERE, [&second, &third] { WaitAndSetEvent(&second, &third); });
1095 background_thread->PostTask(
1096 RTC_FROM_HERE, [&third, &fourth] { WaitAndSetEvent(&third, &fourth); });
Henrik Boströmba4dcc32019-02-28 09:34:06 +01001097
1098 // All tasks have been posted before the first one is unblocked.
1099 first.Set();
1100 // Only if the chain is invoked in posted order will the last event be set.
1101 fourth.Wait(Event::kForever);
1102}
1103
Steve Antonbcc1a762019-12-11 11:21:53 -08001104TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) {
1105 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1106 background_thread->Start();
1107
1108 // The first event ensures that SendSingleMessage() is not blocking this
1109 // thread. The second event ensures that the message is processed.
1110 Event event_set_by_test_thread;
1111 Event event_set_by_background_thread;
1112 background_thread->PostDelayedTask(
1113 RTC_FROM_HERE,
Niels Möller1a29a5d2021-01-18 11:35:23 +01001114 [&event_set_by_test_thread, &event_set_by_background_thread] {
1115 WaitAndSetEvent(&event_set_by_test_thread,
1116 &event_set_by_background_thread);
1117 },
Steve Antonbcc1a762019-12-11 11:21:53 -08001118 /*milliseconds=*/10);
1119 event_set_by_test_thread.Set();
1120 event_set_by_background_thread.Wait(Event::kForever);
1121}
1122
1123TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) {
Steve Anton094396f2019-12-16 00:56:02 -08001124 ScopedFakeClock clock;
Steve Antonbcc1a762019-12-11 11:21:53 -08001125 std::unique_ptr<rtc::Thread> background_thread(rtc::Thread::Create());
1126 background_thread->Start();
1127
1128 Event first;
1129 Event second;
1130 Event third;
1131 Event fourth;
1132
Niels Möller1a29a5d2021-01-18 11:35:23 +01001133 background_thread->PostDelayedTask(
1134 RTC_FROM_HERE, [&third, &fourth] { WaitAndSetEvent(&third, &fourth); },
1135 /*milliseconds=*/11);
1136 background_thread->PostDelayedTask(
1137 RTC_FROM_HERE, [&first, &second] { WaitAndSetEvent(&first, &second); },
1138 /*milliseconds=*/9);
1139 background_thread->PostDelayedTask(
1140 RTC_FROM_HERE, [&second, &third] { WaitAndSetEvent(&second, &third); },
1141 /*milliseconds=*/10);
Steve Antonbcc1a762019-12-11 11:21:53 -08001142
1143 // All tasks have been posted before the first one is unblocked.
1144 first.Set();
Steve Anton094396f2019-12-16 00:56:02 -08001145 // Only if the chain is invoked in delay order will the last event be set.
Danil Chapovalov0c626af2020-02-10 11:16:00 +01001146 clock.AdvanceTime(webrtc::TimeDelta::Millis(11));
Steve Anton094396f2019-12-16 00:56:02 -08001147 EXPECT_TRUE(fourth.Wait(0));
Steve Antonbcc1a762019-12-11 11:21:53 -08001148}
1149
Tommi6866dc72020-05-15 10:11:56 +02001150TEST(ThreadPostDelayedTaskTest, IsCurrentTaskQueue) {
1151 auto current_tq = webrtc::TaskQueueBase::Current();
1152 {
1153 std::unique_ptr<rtc::Thread> thread(rtc::Thread::Create());
1154 thread->WrapCurrent();
1155 EXPECT_EQ(webrtc::TaskQueueBase::Current(),
1156 static_cast<webrtc::TaskQueueBase*>(thread.get()));
1157 thread->UnwrapCurrent();
1158 }
1159 EXPECT_EQ(webrtc::TaskQueueBase::Current(), current_tq);
1160}
1161
Danil Chapovalov912b3b82019-11-22 15:52:40 +01001162class ThreadFactory : public webrtc::TaskQueueFactory {
1163 public:
1164 std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
1165 CreateTaskQueue(absl::string_view /* name */,
1166 Priority /*priority*/) const override {
1167 std::unique_ptr<Thread> thread = Thread::Create();
1168 thread->Start();
1169 return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
1170 thread.release());
1171 }
1172};
1173
1174using ::webrtc::TaskQueueTest;
1175
1176INSTANTIATE_TEST_SUITE_P(RtcThread,
1177 TaskQueueTest,
1178 ::testing::Values(std::make_unique<ThreadFactory>));
1179
Mirko Bonadeie10b1632018-12-11 18:43:40 +01001180} // namespace
1181} // namespace rtc