blob: c3aac1c7bad8aba2ddbf892fdc5f87b5657e92fa [file] [log] [blame]
Honghai Zhang17b92cb2019-11-07 22:58:49 +00001/*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "test/single_threaded_task_queue.h"
12
13#include <memory>
14#include <utility>
15
16#include "rtc_base/checks.h"
17#include "rtc_base/numerics/safe_conversions.h"
18#include "rtc_base/time_utils.h"
19
20namespace webrtc {
21namespace test {
22
23DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask(
24 DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id,
25 std::unique_ptr<QueuedTask> task)
26 : task_id(task_id), task(std::move(task)) {}
27
28DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() =
29 default;
30
31DEPRECATED_SingleThreadedTaskQueueForTesting::
32 DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name)
33 : thread_(Run, this, name), running_(true), next_task_id_(0) {
34 thread_.Start();
35}
36
37DEPRECATED_SingleThreadedTaskQueueForTesting::
38 ~DEPRECATED_SingleThreadedTaskQueueForTesting() {
39 Stop();
40}
41
42DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
43DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed(
44 std::unique_ptr<QueuedTask> task,
45 int64_t delay_ms) {
46 int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
47
48 rtc::CritScope lock(&cs_);
49 if (!running_)
50 return kInvalidTaskId;
51
52 TaskId id = next_task_id_++;
53
54 // Insert after any other tasks with an earlier-or-equal target time.
55 // Note: multimap has promise "The order of the key-value pairs whose keys
56 // compare equivalent is the order of insertion and does not change."
57 tasks_.emplace(std::piecewise_construct,
58 std::forward_as_tuple(earliest_exec_time),
59 std::forward_as_tuple(id, std::move(task)));
60
61 // This class is optimized for simplicty, not for performance. This will wake
62 // the thread up even if the next task in the queue is only scheduled for
63 // quite some time from now. In that case, the thread will just send itself
64 // back to sleep.
65 wake_up_.Set();
66
67 return id;
68}
69
70bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
71 rtc::CritScope lock(&cs_);
72 for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
73 if (it->second.task_id == task_id) {
74 tasks_.erase(it);
75 return true;
76 }
77 }
78 return false;
79}
80
81bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsCurrent() {
82 return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef());
83}
84
85bool DEPRECATED_SingleThreadedTaskQueueForTesting::IsRunning() {
86 RTC_DCHECK_RUN_ON(&owner_thread_checker_);
87 // We could check the |running_| flag here, but this is equivalent for the
88 // purposes of this function.
89 return thread_.IsRunning();
90}
91
92bool DEPRECATED_SingleThreadedTaskQueueForTesting::HasPendingTasks() const {
93 rtc::CritScope lock(&cs_);
94 return !tasks_.empty();
95}
96
97void DEPRECATED_SingleThreadedTaskQueueForTesting::Stop() {
98 RTC_DCHECK_RUN_ON(&owner_thread_checker_);
99 if (!thread_.IsRunning())
100 return;
101
102 {
103 rtc::CritScope lock(&cs_);
104 running_ = false;
105 }
106
107 wake_up_.Set();
108 thread_.Stop();
109}
110
111void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) {
112 static_cast<DEPRECATED_SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
113}
114
115void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
116 CurrentTaskQueueSetter set_current(this);
117 while (true) {
118 std::unique_ptr<QueuedTask> queued_task;
119
120 // An empty queue would lead to sleeping until the queue becoems non-empty.
121 // A queue where the earliest task is scheduled for later than now, will
122 // lead to sleeping until the time of the next scheduled task (or until
123 // more tasks are scheduled).
124 int wait_time = rtc::Event::kForever;
125
126 {
127 rtc::CritScope lock(&cs_);
128 if (!running_) {
129 return;
130 }
131 if (!tasks_.empty()) {
132 auto next_delayed_task = tasks_.begin();
133 int64_t earliest_exec_time = next_delayed_task->first;
134 int64_t remaining_delay_ms =
135 rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis());
136 if (remaining_delay_ms <= 0) {
137 queued_task = std::move(next_delayed_task->second.task);
138 tasks_.erase(next_delayed_task);
139 } else {
140 wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
141 }
142 }
143 }
144
145 if (queued_task) {
146 if (!queued_task->Run()) {
147 queued_task.release();
148 }
149 } else {
150 wake_up_.Wait(wait_time);
151 }
152 }
153}
154
155void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() {
156 Stop();
157 delete this;
158}
159
160} // namespace test
161} // namespace webrtc