Fixing potential AsyncInvoker deadlock that occurs for "reentrant" invocations.
The deadlock occurs if the AsyncInvoker is destroyed on thread A while
a task on thread B is running, which AsyncInvokes a task back on thread
A.
This was causing pending_invocations_ to end up negative, because
an AsyncClosure that's never added to a thread's message queue (due to
the "destroying_" flag) caused the count to be decremented but not
incremented.
BUG=None
Review-Url: https://codereview.webrtc.org/2885143006
Cr-Commit-Position: refs/heads/master@{#18225}
diff --git a/webrtc/base/asyncinvoker-inl.h b/webrtc/base/asyncinvoker-inl.h
index 5f7cd49..a1c2c8e 100644
--- a/webrtc/base/asyncinvoker-inl.h
+++ b/webrtc/base/asyncinvoker-inl.h
@@ -27,7 +27,7 @@
// on the calling thread if necessary.
class AsyncClosure {
public:
- explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {}
+ explicit AsyncClosure(AsyncInvoker* invoker);
virtual ~AsyncClosure();
// Runs the asynchronous task, and triggers a callback to the calling
// thread if needed. Should be called from the target thread.
diff --git a/webrtc/base/asyncinvoker.cc b/webrtc/base/asyncinvoker.cc
index bfd1317..d258309 100644
--- a/webrtc/base/asyncinvoker.cc
+++ b/webrtc/base/asyncinvoker.cc
@@ -19,7 +19,7 @@
AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
AsyncInvoker::~AsyncInvoker() {
- destroying_ = true;
+ AtomicOps::Increment(&destroying_);
// Messages for this need to be cleared *before* our destructor is complete.
MessageQueueManager::Clear(this);
// And we need to wait for any invocations that are still in progress on
@@ -44,7 +44,8 @@
}
void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
- if (destroying_) return;
+ if (AtomicOps::AcquireLoad(&destroying_))
+ return;
// Run this on |thread| to reduce the number of context switches.
if (Thread::Current() != thread) {
@@ -65,11 +66,10 @@
Thread* thread,
std::unique_ptr<AsyncClosure> closure,
uint32_t id) {
- if (destroying_) {
+ if (AtomicOps::AcquireLoad(&destroying_)) {
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
return;
}
- AtomicOps::Increment(&pending_invocations_);
thread->Post(posted_from, this, id,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
}
@@ -79,11 +79,10 @@
std::unique_ptr<AsyncClosure> closure,
uint32_t delay_ms,
uint32_t id) {
- if (destroying_) {
+ if (AtomicOps::AcquireLoad(&destroying_)) {
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
return;
}
- AtomicOps::Increment(&pending_invocations_);
thread->PostDelayed(posted_from, delay_ms, this, id,
new ScopedMessageData<AsyncClosure>(std::move(closure)));
}
@@ -111,6 +110,10 @@
thread_ = nullptr;
}
+AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {
+ AtomicOps::Increment(&invoker_->pending_invocations_);
+}
+
AsyncClosure::~AsyncClosure() {
AtomicOps::Decrement(&invoker_->pending_invocations_);
invoker_->invocation_complete_.Set();
diff --git a/webrtc/base/asyncinvoker.h b/webrtc/base/asyncinvoker.h
index 5414867..0684f4c 100644
--- a/webrtc/base/asyncinvoker.h
+++ b/webrtc/base/asyncinvoker.h
@@ -120,7 +120,7 @@
uint32_t id);
volatile int pending_invocations_ = 0;
Event invocation_complete_;
- bool destroying_ = false;
+ int destroying_ = 0;
friend class AsyncClosure;
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc
index c143120..c8f9e35 100644
--- a/webrtc/base/thread_unittest.cc
+++ b/webrtc/base/thread_unittest.cc
@@ -104,7 +104,7 @@
Socket* socket_;
};
-class CustomThread : public rtc::Thread {
+class CustomThread : public Thread {
public:
CustomThread() {}
virtual ~CustomThread() { Stop(); }
@@ -150,7 +150,7 @@
// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
// the right thing to do, but those features are not yet allowed. Or
-// rtc::AtomicInt, if/when that is added. Since the use isn't
+// AtomicInt, if/when that is added. Since the use isn't
// performance critical, use a plain critical section for the time
// being.
@@ -451,27 +451,23 @@
// executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE"
// is run.
Event functor_started(false, false);
- Event functor_continue(false, false);
Event functor_finished(false, false);
Thread thread;
thread.Start();
volatile bool invoker_destroyed = false;
{
+ auto functor = [&functor_started, &functor_finished, &invoker_destroyed] {
+ functor_started.Set();
+ Thread::Current()->SleepMs(kWaitTimeout);
+ EXPECT_FALSE(invoker_destroyed);
+ functor_finished.Set();
+ };
AsyncInvoker invoker;
- invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread,
- [&functor_started, &functor_continue,
- &functor_finished, &invoker_destroyed] {
- functor_started.Set();
- functor_continue.Wait(Event::kForever);
- rtc::Thread::Current()->SleepMs(kWaitTimeout);
- EXPECT_FALSE(invoker_destroyed);
- functor_finished.Set();
- });
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
functor_started.Wait(Event::kForever);
-
- // Allow the functor to continue and immediately destroy the invoker.
- functor_continue.Set();
+ // Destroy the invoker while the functor is still executing (doing
+ // SleepMs).
}
// If the destructor DIDN'T wait for the functor to finish executing, it will
@@ -481,6 +477,35 @@
functor_finished.Wait(Event::kForever);
}
+// Variant of the above test where the async-invoked task calls AsyncInvoke
+// again, for the thread on which the AsyncInvoker is currently being
+// destroyed. This shouldn't deadlock or crash; this second invocation should
+// just be ignored.
+TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) {
+ Event functor_started(false, false);
+ bool reentrant_functor_run = false;
+
+ Thread* main = Thread::Current();
+ Thread thread;
+ thread.Start();
+ {
+ AsyncInvoker invoker;
+ auto reentrant_functor = [&reentrant_functor_run] {
+ reentrant_functor_run = true;
+ };
+ auto functor = [&functor_started, &invoker, main, reentrant_functor] {
+ functor_started.Set();
+ Thread::Current()->SleepMs(kWaitTimeout);
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, main, reentrant_functor);
+ };
+ // This queues a task on |thread| to sleep for |kWaitTimeout| then queue a
+ // task on |main|. But this second queued task should never run.
+ invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
+ functor_started.Wait(Event::kForever);
+ }
+ EXPECT_FALSE(reentrant_functor_run);
+}
+
TEST_F(AsyncInvokeTest, Flush) {
AsyncInvoker invoker;
AtomicBool flag1;