Rewrite TaskQueueTest.PostALot
avoid waiting while executing a Task to discourage blocking.
fix accessing tasks_clean_up counter since after TaskQueue is destroyed,
it doesn't guarantee sequential execution of the destructors, nor
that all pending tasks are destroyed at that moment.
Instead verify that all posted tasks will be destroyed eventually.
Bug: None
Change-Id: I4cfc97ac0787fe2d0b9d2f0d712a37ae0ca9e1aa
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/140288
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28208}
diff --git a/api/task_queue/task_queue_test.cc b/api/task_queue/task_queue_test.cc
index 8d02ed6..8cf59ce 100644
--- a/api/task_queue/task_queue_test.cc
+++ b/api/task_queue/task_queue_test.cc
@@ -12,6 +12,7 @@
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "rtc_base/event.h"
+#include "rtc_base/ref_counter.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
@@ -186,37 +187,53 @@
EXPECT_TRUE(event.Wait(1000));
}
-// Tests posting more messages than a queue can queue up.
-// In situations like that, tasks will get dropped.
TEST_P(TaskQueueTest, PostALot) {
+ // Waits until DecrementCount called |count| times. Thread safe.
+ class BlockingCounter {
+ public:
+ explicit BlockingCounter(int initial_count) : count_(initial_count) {}
+
+ void DecrementCount() {
+ if (count_.DecRef() == rtc::RefCountReleaseStatus::kDroppedLastRef) {
+ event_.Set();
+ }
+ }
+ bool Wait(int give_up_after_ms) { return event_.Wait(give_up_after_ms); }
+
+ private:
+ webrtc_impl::RefCounter count_;
+ rtc::Event event_;
+ };
+
std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
- // To destruct the event after the queue has gone out of scope.
- rtc::Event event;
+ static constexpr int kTaskCount = 0xffff;
+ rtc::Event posting_done;
+ BlockingCounter all_destroyed(kTaskCount);
int tasks_executed = 0;
- int tasks_cleaned_up = 0;
- static const int kTaskCount = 0xffff;
+ auto task_queue = CreateTaskQueue(factory, "PostALot");
- {
- auto queue = CreateTaskQueue(factory, "PostALot");
+ task_queue->PostTask(ToQueuedTask([&] {
+ // Post tasks from the queue to guarantee that the 1st task won't be
+ // executed before the last one is posted.
+ for (int i = 0; i < kTaskCount; ++i) {
+ task_queue->PostTask(ToQueuedTask(
+ [&] { ++tasks_executed; }, [&] { all_destroyed.DecrementCount(); }));
+ }
- // On linux, the limit of pending bytes in the pipe buffer is 0xffff.
- // So here we post a total of 0xffff+1 messages, which triggers a failure
- // case inside of the libevent queue implementation.
+ posting_done.Set();
+ }));
- queue->PostTask(ToQueuedTask([&event] {
- rtc::ScopedAllowBaseSyncPrimitivesForTesting allow_base_sync_primitives;
- event.Wait(rtc::Event::kForever);
- }));
- for (int i = 0; i < kTaskCount; ++i)
- queue->PostTask(
- ToQueuedTask([&tasks_executed] { ++tasks_executed; },
- [&tasks_cleaned_up] { ++tasks_cleaned_up; }));
- event.Set(); // Unblock the first task.
- }
+ // Before destroying the task queue wait until all child tasks are posted.
+ EXPECT_TRUE(posting_done.Wait(1000));
+ // Destroy the task queue.
+ task_queue = nullptr;
- EXPECT_GE(tasks_cleaned_up, tasks_executed);
- EXPECT_EQ(tasks_cleaned_up, kTaskCount);
+ // Expect all tasks are destroyed eventually. In some task queue
+ // implementations that might happen on a different thread after task queue is
+ // destroyed.
+ EXPECT_TRUE(all_destroyed.Wait(1000));
+ EXPECT_LE(tasks_executed, kTaskCount);
}
// Test posting two tasks that have shared state not protected by a