blob: a143e25f19341bd2d3368c991aca8ab9c1e4d418 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2014 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/asyncinvoker.h"
12
deadbeef162cb532017-02-23 17:10:07 -080013#include "webrtc/base/atomicops.h"
Magnus Jedverta1f590f2015-08-20 16:42:42 +020014#include "webrtc/base/checks.h"
Per33544192015-04-02 12:30:51 +020015#include "webrtc/base/logging.h"
16
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017namespace rtc {
18
deadbeef162cb532017-02-23 17:10:07 -080019AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000020
21AsyncInvoker::~AsyncInvoker() {
22 destroying_ = true;
23 SignalInvokerDestroyed();
24 // Messages for this need to be cleared *before* our destructor is complete.
25 MessageQueueManager::Clear(this);
deadbeef162cb532017-02-23 17:10:07 -080026 // And we need to wait for any invocations that are still in progress on
27 // other threads.
28 while (AtomicOps::AcquireLoad(&pending_invocations_)) {
29 // If the destructor was called while AsyncInvoke was being called by
30 // another thread, WITHIN an AsyncInvoked functor, it may do another
31 // Thread::Post even after we called MessageQueueManager::Clear(this). So
32 // we need to keep calling Clear to discard these posts.
33 MessageQueueManager::Clear(this);
34 invocation_complete_.Wait(Event::kForever);
35 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000036}
37
38void AsyncInvoker::OnMessage(Message* msg) {
39 // Get the AsyncClosure shared ptr from this message's data.
deadbeefa8bc1a12017-02-17 18:06:26 -080040 ScopedMessageData<AsyncClosure>* data =
41 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000042 // Execute the closure and trigger the return message if needed.
deadbeefa8bc1a12017-02-17 18:06:26 -080043 data->inner_data().Execute();
44 delete data;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000045}
46
Peter Boström0c4e06b2015-10-07 12:23:21 +020047void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000048 if (destroying_) return;
49
50 // Run this on |thread| to reduce the number of context switches.
51 if (Thread::Current() != thread) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070052 thread->Invoke<void>(RTC_FROM_HERE,
53 Bind(&AsyncInvoker::Flush, this, thread, id));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000054 return;
55 }
56
57 MessageList removed;
58 thread->Clear(this, id, &removed);
59 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
60 // This message was pending on this thread, so run it now.
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070061 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000062 }
63}
64
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070065void AsyncInvoker::DoInvoke(const Location& posted_from,
66 Thread* thread,
deadbeefa8bc1a12017-02-17 18:06:26 -080067 std::unique_ptr<AsyncClosure> closure,
Peter Boström0c4e06b2015-10-07 12:23:21 +020068 uint32_t id) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000069 if (destroying_) {
70 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000071 return;
72 }
deadbeef162cb532017-02-23 17:10:07 -080073 AtomicOps::Increment(&pending_invocations_);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070074 thread->Post(posted_from, this, id,
deadbeefa8bc1a12017-02-17 18:06:26 -080075 new ScopedMessageData<AsyncClosure>(std::move(closure)));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076}
77
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070078void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
79 Thread* thread,
deadbeefa8bc1a12017-02-17 18:06:26 -080080 std::unique_ptr<AsyncClosure> closure,
Peter Boström0c4e06b2015-10-07 12:23:21 +020081 uint32_t delay_ms,
82 uint32_t id) {
Guo-wei Shiehdc13abc2015-06-18 14:44:41 -070083 if (destroying_) {
84 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
85 return;
86 }
deadbeef162cb532017-02-23 17:10:07 -080087 AtomicOps::Increment(&pending_invocations_);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070088 thread->PostDelayed(posted_from, delay_ms, this, id,
deadbeefa8bc1a12017-02-17 18:06:26 -080089 new ScopedMessageData<AsyncClosure>(std::move(closure)));
Guo-wei Shiehdc13abc2015-06-18 14:44:41 -070090}
91
Magnus Jedverta1f590f2015-08-20 16:42:42 +020092GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
93 thread_->SignalQueueDestroyed.connect(this,
94 &GuardedAsyncInvoker::ThreadDestroyed);
95}
96
97GuardedAsyncInvoker::~GuardedAsyncInvoker() {
98}
99
Peter Boström0c4e06b2015-10-07 12:23:21 +0200100bool GuardedAsyncInvoker::Flush(uint32_t id) {
Magnus Jedverta1f590f2015-08-20 16:42:42 +0200101 rtc::CritScope cs(&crit_);
102 if (thread_ == nullptr)
103 return false;
104 invoker_.Flush(thread_, id);
105 return true;
106}
107
108void GuardedAsyncInvoker::ThreadDestroyed() {
109 rtc::CritScope cs(&crit_);
110 // We should never get more than one notification about the thread dying.
henrikg91d6ede2015-09-17 00:24:34 -0700111 RTC_DCHECK(thread_ != nullptr);
Magnus Jedverta1f590f2015-08-20 16:42:42 +0200112 thread_ = nullptr;
113}
114
deadbeef162cb532017-02-23 17:10:07 -0800115AsyncClosure::~AsyncClosure() {
116 AtomicOps::Decrement(&invoker_->pending_invocations_);
117 invoker_->invocation_complete_.Set();
118}
119
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700120NotifyingAsyncClosureBase::NotifyingAsyncClosureBase(
121 AsyncInvoker* invoker,
122 const Location& callback_posted_from,
123 Thread* calling_thread)
deadbeef162cb532017-02-23 17:10:07 -0800124 : AsyncClosure(invoker),
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700125 callback_posted_from_(callback_posted_from),
126 calling_thread_(calling_thread) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000127 calling_thread->SignalQueueDestroyed.connect(
128 this, &NotifyingAsyncClosureBase::CancelCallback);
129 invoker->SignalInvokerDestroyed.connect(
130 this, &NotifyingAsyncClosureBase::CancelCallback);
131}
132
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000133NotifyingAsyncClosureBase::~NotifyingAsyncClosureBase() {
134 disconnect_all();
135}
136
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000137void NotifyingAsyncClosureBase::TriggerCallback() {
138 CritScope cs(&crit_);
139 if (!CallbackCanceled() && !callback_.empty()) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700140 invoker_->AsyncInvoke<void>(callback_posted_from_, calling_thread_,
141 callback_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000142 }
143}
144
145void NotifyingAsyncClosureBase::CancelCallback() {
146 // If the callback is triggering when this is called, block the
147 // destructor of the dying object here by waiting until the callback
148 // is done triggering.
149 CritScope cs(&crit_);
deadbeef37f5ecf2017-02-27 14:06:41 -0800150 // calling_thread_ == nullptr means do not trigger the callback.
151 calling_thread_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000152}
153
154} // namespace rtc