blob: 144d587db984c65fc7c5cff023afaaf133c86f91 [file] [log] [blame]
Sebastian Jansson0d617cc2019-03-22 15:22:16 +01001/*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10#include "test/time_controller/simulated_time_controller.h"
11
12#include <algorithm>
13#include <deque>
14#include <list>
15#include <map>
16#include <set>
17#include <string>
18#include <thread>
19#include <vector>
20
21#include "absl/memory/memory.h"
22#include "absl/strings/string_view.h"
23
24namespace webrtc {
25
26namespace sim_time_impl {
27class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
28 public:
29 SimulatedSequenceRunner(SimulatedTimeControllerImpl* handler,
30 absl::string_view queue_name)
31 : handler_(handler), name_(queue_name) {}
32 ~SimulatedSequenceRunner() override { handler_->Unregister(this); }
33
34 // Provides next run time.
35 Timestamp GetNextRunTime() const;
36
37 // Iterates through delayed tasks and modules and moves them to the ready set
38 // if they are supposed to execute by |at time|.
39 void UpdateReady(Timestamp at_time);
40 // Runs all ready tasks and modules and updates next run time.
41 void Run(Timestamp at_time);
42
43 // TaskQueueBase interface
44 void Delete() override;
45 // Note: PostTask is also in ProcessThread interface.
46 void PostTask(std::unique_ptr<QueuedTask> task) override;
47 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
48 uint32_t milliseconds) override;
49
50 // ProcessThread interface
51 void Start() override;
52 void Stop() override;
53 void WakeUp(Module* module) override;
54 void RegisterModule(Module* module, const rtc::Location& from) override;
55 void DeRegisterModule(Module* module) override;
56
57 private:
58 Timestamp GetCurrentTime() const { return handler_->CurrentTime(); }
59 void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_);
60 void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
61 void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
62
63 SimulatedTimeControllerImpl* const handler_;
64 const std::string name_;
65
66 rtc::CriticalSection lock_;
67
68 std::deque<std::unique_ptr<QueuedTask>> ready_tasks_ RTC_GUARDED_BY(lock_);
69 std::multimap<Timestamp, std::unique_ptr<QueuedTask>> delayed_tasks_
70 RTC_GUARDED_BY(lock_);
71
72 bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
73 std::set<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
74 std::set<Module*> ready_modules_ RTC_GUARDED_BY(lock_);
75 std::multimap<Timestamp, Module*> delayed_modules_ RTC_GUARDED_BY(lock_);
76
77 Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
78};
79
80Timestamp SimulatedSequenceRunner::GetNextRunTime() const {
81 rtc::CritScope lock(&lock_);
82 return next_run_time_;
83}
84
85void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) {
86 rtc::CritScope lock(&lock_);
87 for (auto it = delayed_tasks_.begin();
88 it != delayed_tasks_.end() && it->first <= at_time;) {
89 ready_tasks_.emplace_back(std::move(it->second));
90 it = delayed_tasks_.erase(it);
91 }
92 for (auto it = delayed_modules_.begin();
93 it != delayed_modules_.end() && it->first <= at_time;) {
94 ready_modules_.insert(it->second);
95 it = delayed_modules_.erase(it);
96 }
97}
98
99void SimulatedSequenceRunner::Run(Timestamp at_time) {
100 RunReadyTasks(at_time);
101 rtc::CritScope lock(&lock_);
102 RunReadyModules(at_time);
103 UpdateNextRunTime();
104}
105
106void SimulatedSequenceRunner::Delete() {
107 {
108 rtc::CritScope lock(&lock_);
109 ready_tasks_.clear();
110 delayed_tasks_.clear();
111 }
112 delete this;
113}
114
115void SimulatedSequenceRunner::RunReadyTasks(Timestamp at_time) {
116 std::deque<std::unique_ptr<QueuedTask>> ready_tasks;
117 {
118 rtc::CritScope lock(&lock_);
119 ready_tasks.swap(ready_tasks_);
120 }
121 if (!ready_tasks.empty()) {
122 CurrentTaskQueueSetter set_current(this);
123 for (auto& ready : ready_tasks) {
124 bool delete_task = ready->Run();
125 if (delete_task) {
126 ready.reset();
127 } else {
128 ready.release();
129 }
130 }
131 }
132}
133
134void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) {
135 if (!ready_modules_.empty()) {
136 CurrentTaskQueueSetter set_current(this);
137 for (auto* module : ready_modules_) {
138 module->Process();
139 Timestamp next_run_time =
140 at_time + TimeDelta::ms(module->TimeUntilNextProcess());
141 delayed_modules_.emplace(next_run_time, module);
142 }
143 }
144 ready_modules_.clear();
145}
146
147void SimulatedSequenceRunner::UpdateNextRunTime() {
148 if (!ready_tasks_.empty() || !ready_modules_.empty()) {
149 next_run_time_ = Timestamp::MinusInfinity();
150 } else {
151 next_run_time_ = Timestamp::PlusInfinity();
152 if (!delayed_tasks_.empty())
153 next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
154 if (!delayed_modules_.empty())
155 next_run_time_ =
156 std::min(next_run_time_, delayed_modules_.begin()->first);
157 }
158}
159
160void SimulatedSequenceRunner::PostTask(std::unique_ptr<QueuedTask> task) {
161 rtc::CritScope lock(&lock_);
162 ready_tasks_.emplace_back(std::move(task));
163 next_run_time_ = Timestamp::MinusInfinity();
164}
165
166void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr<QueuedTask> task,
167 uint32_t milliseconds) {
168 rtc::CritScope lock(&lock_);
169 Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds);
170 delayed_tasks_.emplace(target_time, std::move(task));
171 next_run_time_ = std::min(next_run_time_, target_time);
172}
173
174void SimulatedSequenceRunner::Start() {
175 std::set<Module*> starting;
176 {
177 rtc::CritScope lock(&lock_);
178 if (process_thread_running_)
179 return;
180 process_thread_running_ = true;
181 starting.swap(stopped_modules_);
182 }
183 for (auto& module : starting)
184 module->ProcessThreadAttached(this);
185
186 Timestamp at_time = GetCurrentTime();
187 rtc::CritScope lock(&lock_);
188 for (auto& module : starting)
189 delayed_modules_.insert(
190 {at_time + TimeDelta::ms(module->TimeUntilNextProcess()), module});
191 UpdateNextRunTime();
192}
193
194void SimulatedSequenceRunner::Stop() {
195 std::set<Module*> stopping;
196 {
197 rtc::CritScope lock(&lock_);
198 process_thread_running_ = false;
199
200 for (auto* ready : ready_modules_)
201 stopped_modules_.insert(ready);
202 ready_modules_.clear();
203
204 for (auto& delayed : delayed_modules_)
205 stopped_modules_.insert(delayed.second);
206 delayed_modules_.clear();
207
208 stopping = stopped_modules_;
209 }
210 for (auto& module : stopping)
211 module->ProcessThreadAttached(nullptr);
212}
213
214void SimulatedSequenceRunner::WakeUp(Module* module) {
215 rtc::CritScope lock(&lock_);
216 // If we already are planning to run this module as soon as possible, we don't
217 // need to do anything.
218 if (ready_modules_.find(module) != ready_modules_.end())
219 return;
220
221 for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
222 if (it->second == module) {
223 delayed_modules_.erase(it);
224 break;
225 }
226 }
227 Timestamp next_time =
228 GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess());
229 delayed_modules_.insert({next_time, module});
230 next_run_time_ = std::min(next_run_time_, next_time);
231}
232
233void SimulatedSequenceRunner::RegisterModule(Module* module,
234 const rtc::Location& from) {
235 module->ProcessThreadAttached(this);
236 rtc::CritScope lock(&lock_);
237 if (!process_thread_running_) {
238 stopped_modules_.insert(module);
239 } else {
240 Timestamp next_time =
241 GetCurrentTime() + TimeDelta::ms(module->TimeUntilNextProcess());
242 delayed_modules_.insert({next_time, module});
243 next_run_time_ = std::min(next_run_time_, next_time);
244 }
245}
246
247void SimulatedSequenceRunner::DeRegisterModule(Module* module) {
248 bool modules_running;
249 {
250 rtc::CritScope lock(&lock_);
251 if (!process_thread_running_) {
252 stopped_modules_.erase(module);
253 } else {
254 ready_modules_.erase(module);
255 for (auto it = delayed_modules_.begin(); it != delayed_modules_.end();
256 ++it) {
257 if (it->second == module) {
258 delayed_modules_.erase(it);
259 break;
260 }
261 }
262 }
263 modules_running = process_thread_running_;
264 }
265 if (modules_running)
266 module->ProcessThreadAttached(nullptr);
267}
268
269SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time)
270 : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {}
271
272SimulatedTimeControllerImpl::~SimulatedTimeControllerImpl() = default;
273
274std::unique_ptr<TaskQueueBase, TaskQueueDeleter>
275SimulatedTimeControllerImpl::CreateTaskQueue(
276 absl::string_view name,
277 TaskQueueFactory::Priority priority) const {
278 // TODO(srte): Remove the const cast when the interface is made mutable.
279 auto mutable_this = const_cast<SimulatedTimeControllerImpl*>(this);
280 auto task_queue = std::unique_ptr<SimulatedSequenceRunner, TaskQueueDeleter>(
281 new SimulatedSequenceRunner(mutable_this, name));
282 rtc::CritScope lock(&mutable_this->lock_);
283 mutable_this->runners_.insert(task_queue.get());
284 return task_queue;
285}
286
287std::unique_ptr<ProcessThread> SimulatedTimeControllerImpl::CreateProcessThread(
288 const char* thread_name) {
289 rtc::CritScope lock(&lock_);
290 auto process_thread =
291 absl::make_unique<SimulatedSequenceRunner>(this, thread_name);
292 runners_.insert(process_thread.get());
293 return process_thread;
294}
295
296std::vector<SimulatedSequenceRunner*>
297SimulatedTimeControllerImpl::GetNextReadyRunner(Timestamp current_time) {
298 rtc::CritScope lock(&lock_);
299 std::vector<SimulatedSequenceRunner*> ready;
300 for (auto* runner : runners_) {
301 if (yielded_.find(runner) == yielded_.end() &&
302 runner->GetNextRunTime() <= current_time) {
303 ready.push_back(runner);
304 }
305 }
306 return ready;
307}
308
309void SimulatedTimeControllerImpl::YieldExecution() {
310 if (rtc::CurrentThreadId() == thread_id_) {
311 RTC_DCHECK_RUN_ON(&thread_checker_);
312 // When we yield, we don't want to risk executing further tasks on the
313 // currently executing task queue. If there's a ready task that also yields,
314 // it's added to this set as well and only tasks on the remaining task
315 // queues are executed.
316 auto inserted = yielded_.insert(TaskQueueBase::Current());
317 RTC_DCHECK(inserted.second);
318 RunReadyRunners();
319 yielded_.erase(inserted.first);
320 }
321}
322
323void SimulatedTimeControllerImpl::RunReadyRunners() {
324 RTC_DCHECK_RUN_ON(&thread_checker_);
325 Timestamp current_time = CurrentTime();
326 // We repeat until we have no ready left to handle tasks posted by ready
327 // runners.
328 while (true) {
329 auto ready = GetNextReadyRunner(current_time);
330 if (ready.empty())
331 break;
332 for (auto* runner : ready) {
333 runner->UpdateReady(current_time);
334 runner->Run(current_time);
335 }
336 }
337}
338
339Timestamp SimulatedTimeControllerImpl::CurrentTime() const {
340 rtc::CritScope lock(&time_lock_);
341 return current_time_;
342}
343
344Timestamp SimulatedTimeControllerImpl::NextRunTime() const {
345 Timestamp current_time = CurrentTime();
346 Timestamp next_time = Timestamp::PlusInfinity();
347 rtc::CritScope lock(&lock_);
348 for (auto* runner : runners_) {
349 Timestamp next_run_time = runner->GetNextRunTime();
350 if (next_run_time <= current_time)
351 return current_time;
352 next_time = std::min(next_time, next_run_time);
353 }
354 return next_time;
355}
356
357void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) {
358 rtc::CritScope time_lock(&time_lock_);
359 RTC_DCHECK(target_time >= current_time_);
360 current_time_ = target_time;
361}
362
363void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) {
364 rtc::CritScope lock(&lock_);
365 RTC_CHECK(runners_.erase(runner));
366}
367
368} // namespace sim_time_impl
369
370GlobalSimulatedTimeController::GlobalSimulatedTimeController(
371 Timestamp start_time)
372 : sim_clock_(start_time.us()), impl_(start_time) {
373 global_clock_.SetTimeMicros(start_time.us());
374}
375
376GlobalSimulatedTimeController::~GlobalSimulatedTimeController() = default;
377
378Clock* GlobalSimulatedTimeController::GetClock() {
379 return &sim_clock_;
380}
381
382TaskQueueFactory* GlobalSimulatedTimeController::GetTaskQueueFactory() {
383 return &impl_;
384}
385
386std::unique_ptr<ProcessThread>
387GlobalSimulatedTimeController::CreateProcessThread(const char* thread_name) {
388 return impl_.CreateProcessThread(thread_name);
389}
390
391void GlobalSimulatedTimeController::Sleep(TimeDelta duration) {
392 rtc::ScopedYieldPolicy yield_policy(&impl_);
393 Timestamp current_time = impl_.CurrentTime();
394 Timestamp target_time = current_time + duration;
395 RTC_DCHECK_EQ(current_time.us(), rtc::TimeMicros());
396 while (current_time < target_time) {
397 impl_.RunReadyRunners();
398 Timestamp next_time = std::min(impl_.NextRunTime(), target_time);
399 impl_.AdvanceTime(next_time);
400 auto delta = next_time - current_time;
401 current_time = next_time;
402 sim_clock_.AdvanceTimeMicroseconds(delta.us());
403 global_clock_.AdvanceTimeMicros(delta.us());
404 }
405}
406
407void GlobalSimulatedTimeController::InvokeWithControlledYield(
408 std::function<void()> closure) {
409 rtc::ScopedYieldPolicy yield_policy(&impl_);
410 closure();
411}
412
413// namespace sim_time_impl
414
415} // namespace webrtc