Fixing race between ~AsyncInvoker and ~AsyncClosure, using ref-counting.
The AsyncInvoker destructor waits for all invoked tasks to be complete
(in other words, all AsyncClosures to be destructed). They were using an
event to wake up the destructor, but a race made it possible for this
event to be dereferenced after it's destroyed.
This CL makes the event reference counted, such that if the destructor
runs right after AsyncClosure decrements "pending_invocations_",
setting the event will be a no-op, and the event will be destructed
in the AsyncClosure destructor.
This CL also fixes a deadlock that may occur for "re-entrant"
invocations. The deadlock occurs if the AsyncInvoker is destroyed on
thread A while a task on thread B is running, which AsyncInvokes a task
back on thread A.
This was causing pending_invocations_ to end up negative, because
an AsyncClosure that's never added to a thread's message queue (due to
the "destroying_" flag) caused the count to be decremented but not
incremented.
BUG=webrtc:7656
Review-Url: https://codereview.webrtc.org/2885143005
Cr-Commit-Position: refs/heads/master@{#19278}
diff --git a/webrtc/rtc_base/asyncinvoker-inl.h b/webrtc/rtc_base/asyncinvoker-inl.h
index 5b2cf4e..7878b15 100644
--- a/webrtc/rtc_base/asyncinvoker-inl.h
+++ b/webrtc/rtc_base/asyncinvoker-inl.h
@@ -11,10 +11,12 @@
#ifndef WEBRTC_RTC_BASE_ASYNCINVOKER_INL_H_
#define WEBRTC_RTC_BASE_ASYNCINVOKER_INL_H_
-#include "webrtc/rtc_base/atomicops.h"
#include "webrtc/rtc_base/bind.h"
#include "webrtc/rtc_base/criticalsection.h"
+#include "webrtc/rtc_base/event.h"
#include "webrtc/rtc_base/messagehandler.h"
+#include "webrtc/rtc_base/refcountedobject.h"
+#include "webrtc/rtc_base/scoped_ref_ptr.h"
#include "webrtc/rtc_base/sigslot.h"
#include "webrtc/rtc_base/thread.h"
#include "webrtc/rtc_base/thread_annotations.h"
@@ -27,7 +29,7 @@
// on the calling thread if necessary.
class AsyncClosure {
public:
- explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {}
+ explicit AsyncClosure(AsyncInvoker* invoker);
virtual ~AsyncClosure();
// Runs the asynchronous task, and triggers a callback to the calling
// thread if needed. Should be called from the target thread.
@@ -35,6 +37,11 @@
protected:
AsyncInvoker* invoker_;
+ // Reference counted so that if the AsyncInvoker destructor finishes before
+ // an AsyncClosure's destructor that's about to call
+ // "invocation_complete_->Set()", it's not dereferenced after being
+ // destroyed.
+ scoped_refptr<RefCountedObject<Event>> invocation_complete_;
};
// Simple closure that doesn't trigger a callback for the calling thread.
diff --git a/webrtc/rtc_base/asyncinvoker.cc b/webrtc/rtc_base/asyncinvoker.cc
index 94abfd5..89e4e77 100644
--- a/webrtc/rtc_base/asyncinvoker.cc
+++ b/webrtc/rtc_base/asyncinvoker.cc
@@ -10,27 +10,30 @@
#include "webrtc/rtc_base/asyncinvoker.h"
-#include "webrtc/rtc_base/atomicops.h"
#include "webrtc/rtc_base/checks.h"
#include "webrtc/rtc_base/logging.h"
namespace rtc {
-AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
+AsyncInvoker::AsyncInvoker()
+ : pending_invocations_(0),
+ invocation_complete_(new RefCountedObject<Event>(false, false)),
+ destroying_(false) {}
AsyncInvoker::~AsyncInvoker() {
- destroying_ = true;
+ destroying_.store(true, std::memory_order_relaxed);
// Messages for this need to be cleared *before* our destructor is complete.
MessageQueueManager::Clear(this);
// And we need to wait for any invocations that are still in progress on
- // other threads.
- while (AtomicOps::AcquireLoad(&pending_invocations_)) {
+ // other threads. Using memory_order_acquire for synchronization with
+ // AsyncClosure destructors.
+ while (pending_invocations_.load(std::memory_order_acquire) > 0) {
// If the destructor was called while AsyncInvoke was being called by
// another thread, WITHIN an AsyncInvoked functor, it may do another
// Thread::Post even after we called MessageQueueManager::Clear(this). So
// we need to keep calling Clear to discard these posts.
- MessageQueueManager::Clear(this);
- invocation_complete_.Wait(Event::kForever);
+ Thread::Current()->Clear(this);
+ invocation_complete_->Wait(Event::kForever);
}
}
@@ -44,7 +47,10 @@
}
void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
- if (destroying_) return;
+ // If the destructor is waiting for invocations to finish, don't start
+ // running even more tasks.
+ if (destroying_.load(std::memory_order_relaxed))
+ return;
// Run this on |thread| to reduce the number of context switches.
if (Thread::Current() != thread) {
@@ -65,11 +71,14 @@
Thread* thread,
std::unique_ptr<AsyncClosure> closure,
uint32_t id) {
- if (destroying_) {
+ if (destroying_.load(std::memory_order_relaxed)) {
+ // Note that this may be expected, if the application is AsyncInvoking
+ // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
+ // between a thread destroying the AsyncInvoker and a thread still trying
+ // to use it.
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
return;
}
- AtomicOps::Increment(&pending_invocations_);
thread->Post(posted_from, this, id,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
}
@@ -79,11 +88,11 @@
std::unique_ptr<AsyncClosure> closure,
uint32_t delay_ms,
uint32_t id) {
- if (destroying_) {
+ if (destroying_.load(std::memory_order_relaxed)) {
+ // See above comment.
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
return;
}
- AtomicOps::Increment(&pending_invocations_);
thread->PostDelayed(posted_from, delay_ms, this, id,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
}
@@ -97,7 +106,7 @@
}
bool GuardedAsyncInvoker::Flush(uint32_t id) {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
if (thread_ == nullptr)
return false;
invoker_.Flush(thread_, id);
@@ -105,15 +114,30 @@
}
void GuardedAsyncInvoker::ThreadDestroyed() {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
// We should never get more than one notification about the thread dying.
RTC_DCHECK(thread_ != nullptr);
thread_ = nullptr;
}
+AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
+ : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
+ invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
+}
+
AsyncClosure::~AsyncClosure() {
- AtomicOps::Decrement(&invoker_->pending_invocations_);
- invoker_->invocation_complete_.Set();
+ // Using memory_order_release for synchronization with the AsyncInvoker
+ // destructor.
+ invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release);
+
+ // After |pending_invocations_| is decremented, we may need to signal
+ // |invocation_complete_| in case the AsyncInvoker is being destroyed and
+ // waiting for pending tasks to complete.
+ //
+ // It's also possible that the destructor finishes before "Set()" is called,
+ // which is safe because the event is reference counted (and in a thread-safe
+ // way).
+ invocation_complete_->Set();
}
} // namespace rtc
diff --git a/webrtc/rtc_base/asyncinvoker.h b/webrtc/rtc_base/asyncinvoker.h
index 17d702a..455ded2 100644
--- a/webrtc/rtc_base/asyncinvoker.h
+++ b/webrtc/rtc_base/asyncinvoker.h
@@ -11,6 +11,7 @@
#ifndef WEBRTC_RTC_BASE_ASYNCINVOKER_H_
#define WEBRTC_RTC_BASE_ASYNCINVOKER_H_
+#include <atomic>
#include <memory>
#include <utility>
@@ -18,6 +19,8 @@
#include "webrtc/rtc_base/bind.h"
#include "webrtc/rtc_base/constructormagic.h"
#include "webrtc/rtc_base/event.h"
+#include "webrtc/rtc_base/refcountedobject.h"
+#include "webrtc/rtc_base/scoped_ref_ptr.h"
#include "webrtc/rtc_base/sigslot.h"
#include "webrtc/rtc_base/thread.h"
@@ -70,6 +73,20 @@
// AsyncInvoker invoker_;
// int result_;
// };
+//
+// More details about threading:
+// - It's safe to construct/destruct AsyncInvoker on different threads.
+// - It's safe to call AsyncInvoke from different threads.
+// - It's safe to call AsyncInvoke recursively from *within* a functor that's
+// being AsyncInvoked.
+// - However, it's *not* safe to call AsyncInvoke from *outside* a functor
+// that's being AsyncInvoked while the AsyncInvoker is being destroyed on
+// another thread. This is just inherently unsafe and there's no way to
+// prevent that. So, the user of this class should ensure that the start of
+// each "chain" of invocations is synchronized somehow with the AsyncInvoker's
+// destruction. This can be done by starting each chain of invocations on the
+// same thread on which it will be destroyed, or by using some other
+// synchronization method.
class AsyncInvoker : public MessageHandler {
public:
AsyncInvoker();
@@ -118,9 +135,28 @@
std::unique_ptr<AsyncClosure> closure,
uint32_t delay_ms,
uint32_t id);
- volatile int pending_invocations_ = 0;
- Event invocation_complete_;
- bool destroying_ = false;
+
+ // Used to keep track of how many invocations (AsyncClosures) are still
+ // alive, so that the destructor can wait for them to finish, as described in
+ // the class documentation.
+ //
+ // TODO(deadbeef): Using a raw std::atomic like this is prone to error and
+ // difficult to maintain. We should try to wrap this functionality in a
+ // separate class to reduce the chance of errors being introduced in the
+ // future.
+ std::atomic<int> pending_invocations_;
+
+ // Reference counted so that if the AsyncInvoker destructor finishes before
+ // an AsyncClosure's destructor that's about to call
+ // "invocation_complete_->Set()", it's not dereferenced after being
+ // destroyed.
+ scoped_refptr<RefCountedObject<Event>> invocation_complete_;
+
+ // This flag is used to ensure that if an application AsyncInvokes tasks that
+ // recursively AsyncInvoke other tasks ad infinitum, the cycle eventually
+ // terminates.
+ std::atomic<bool> destroying_;
+
friend class AsyncClosure;
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
@@ -149,7 +185,7 @@
bool AsyncInvoke(const Location& posted_from,
const FunctorT& functor,
uint32_t id = 0) {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
if (thread_ == nullptr)
return false;
invoker_.AsyncInvoke<ReturnT, FunctorT>(posted_from, thread_, functor, id);
@@ -163,7 +199,7 @@
const FunctorT& functor,
uint32_t delay_ms,
uint32_t id = 0) {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
if (thread_ == nullptr)
return false;
invoker_.AsyncInvokeDelayed<ReturnT, FunctorT>(posted_from, thread_,
@@ -180,7 +216,7 @@
void (HostT::*callback)(ReturnT),
HostT* callback_host,
uint32_t id = 0) {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
if (thread_ == nullptr)
return false;
invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
@@ -198,7 +234,7 @@
void (HostT::*callback)(),
HostT* callback_host,
uint32_t id = 0) {
- rtc::CritScope cs(&crit_);
+ CritScope cs(&crit_);
if (thread_ == nullptr)
return false;
invoker_.AsyncInvoke<ReturnT, FunctorT, HostT>(
diff --git a/webrtc/rtc_base/thread_unittest.cc b/webrtc/rtc_base/thread_unittest.cc
index a8c20d1..e7701e8 100644
--- a/webrtc/rtc_base/thread_unittest.cc
+++ b/webrtc/rtc_base/thread_unittest.cc
@@ -458,19 +458,20 @@
thread->Start();
volatile bool invoker_destroyed = false;
{
+ auto functor = [&functor_started, &functor_continue, &functor_finished,
+ &invoker_destroyed] {
+ functor_started.Set();
+ functor_continue.Wait(Event::kForever);
+ rtc::Thread::Current()->SleepMs(kWaitTimeout);
+ EXPECT_FALSE(invoker_destroyed);
+ functor_finished.Set();
+ };
AsyncInvoker invoker;
- invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(),
- [&functor_started, &functor_continue,
- &functor_finished, &invoker_destroyed] {
- functor_started.Set();
- functor_continue.Wait(Event::kForever);
- rtc::Thread::Current()->SleepMs(kWaitTimeout);
- EXPECT_FALSE(invoker_destroyed);
- functor_finished.Set();
- });
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, thread.get(), functor);
functor_started.Wait(Event::kForever);
- // Allow the functor to continue and immediately destroy the invoker.
+ // Destroy the invoker while the functor is still executing (doing
+ // SleepMs).
functor_continue.Set();
}
@@ -481,6 +482,37 @@
functor_finished.Wait(Event::kForever);
}
+// Variant of the above test where the async-invoked task calls AsyncInvoke
+// *again*, for the thread on which the AsyncInvoker is currently being
+// destroyed. This shouldn't deadlock or crash; this second invocation should
+// just be ignored.
+TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) {
+ Event functor_started(false, false);
+ // Flag used to verify that the recursively invoked task never actually runs.
+ bool reentrant_functor_run = false;
+
+ Thread* main = Thread::Current();
+ Thread thread;
+ thread.Start();
+ {
+ AsyncInvoker invoker;
+ auto reentrant_functor = [&reentrant_functor_run] {
+ reentrant_functor_run = true;
+ };
+ auto functor = [&functor_started, &invoker, main, reentrant_functor] {
+ functor_started.Set();
+ Thread::Current()->SleepMs(kWaitTimeout);
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, main, reentrant_functor);
+ };
+ // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a
+ // task on |main|. But this second queued task should never run, since the
+ // destructor will be entered before it's even invoked.
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
+ functor_started.Wait(Event::kForever);
+ }
+ EXPECT_FALSE(reentrant_functor_run);
+}
+
TEST_F(AsyncInvokeTest, Flush) {
AsyncInvoker invoker;
AtomicBool flag1;