blob: 05f86b4e973ed09790f9b7d373b1345479df86c3 [file] [log] [blame]
Sebastian Jansson0d617cc2019-03-22 15:22:16 +01001/*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10#include "test/time_controller/simulated_time_controller.h"
11
12#include <algorithm>
13#include <deque>
14#include <list>
15#include <map>
Mirko Bonadei317a1f02019-09-17 17:06:18 +020016#include <memory>
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010017#include <string>
18#include <thread>
19#include <vector>
20
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010021#include "absl/strings/string_view.h"
22
23namespace webrtc {
Sebastian Jansson7b6add32019-03-29 10:34:26 +010024namespace {
25// Helper function to remove from a std container by value.
26template <class C>
27bool RemoveByValue(C& vec, typename C::value_type val) {
28 auto it = std::find(vec.begin(), vec.end(), val);
29 if (it == vec.end())
30 return false;
31 vec.erase(it);
32 return true;
33}
34} // namespace
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010035
36namespace sim_time_impl {
37class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
38 public:
39 SimulatedSequenceRunner(SimulatedTimeControllerImpl* handler,
40 absl::string_view queue_name)
41 : handler_(handler), name_(queue_name) {}
42 ~SimulatedSequenceRunner() override { handler_->Unregister(this); }
43
44 // Provides next run time.
45 Timestamp GetNextRunTime() const;
46
47 // Iterates through delayed tasks and modules and moves them to the ready set
48 // if they are supposed to execute by |at time|.
49 void UpdateReady(Timestamp at_time);
50 // Runs all ready tasks and modules and updates next run time.
51 void Run(Timestamp at_time);
52
53 // TaskQueueBase interface
54 void Delete() override;
55 // Note: PostTask is also in ProcessThread interface.
56 void PostTask(std::unique_ptr<QueuedTask> task) override;
57 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
58 uint32_t milliseconds) override;
59
60 // ProcessThread interface
61 void Start() override;
62 void Stop() override;
63 void WakeUp(Module* module) override;
64 void RegisterModule(Module* module, const rtc::Location& from) override;
65 void DeRegisterModule(Module* module) override;
Sebastian Jansson7b6add32019-03-29 10:34:26 +010066 // Promoted to public for use in SimulatedTimeControllerImpl::YieldExecution.
67 using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010068
69 private:
70 Timestamp GetCurrentTime() const { return handler_->CurrentTime(); }
71 void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_);
72 void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
73 void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
Sebastian Jansson7b6add32019-03-29 10:34:26 +010074 Timestamp GetNextTime(Module* module, Timestamp at_time);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010075
76 SimulatedTimeControllerImpl* const handler_;
77 const std::string name_;
78
79 rtc::CriticalSection lock_;
80
81 std::deque<std::unique_ptr<QueuedTask>> ready_tasks_ RTC_GUARDED_BY(lock_);
Sebastian Jansson7b6add32019-03-29 10:34:26 +010082 std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010083 RTC_GUARDED_BY(lock_);
84
85 bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
Sebastian Jansson7b6add32019-03-29 10:34:26 +010086 std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
87 std::vector<Module*> ready_modules_ RTC_GUARDED_BY(lock_);
88 std::map<Timestamp, std::list<Module*>> delayed_modules_
89 RTC_GUARDED_BY(lock_);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +010090
91 Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
92};
93
94Timestamp SimulatedSequenceRunner::GetNextRunTime() const {
95 rtc::CritScope lock(&lock_);
96 return next_run_time_;
97}
98
99void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) {
100 rtc::CritScope lock(&lock_);
101 for (auto it = delayed_tasks_.begin();
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100102 it != delayed_tasks_.end() && it->first <= at_time;
103 it = delayed_tasks_.erase(it)) {
104 for (auto& task : it->second) {
105 ready_tasks_.emplace_back(std::move(task));
106 }
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100107 }
108 for (auto it = delayed_modules_.begin();
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100109 it != delayed_modules_.end() && it->first <= at_time;
110 it = delayed_modules_.erase(it)) {
111 for (auto module : it->second) {
112 ready_modules_.push_back(module);
113 }
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100114 }
115}
116
117void SimulatedSequenceRunner::Run(Timestamp at_time) {
118 RunReadyTasks(at_time);
119 rtc::CritScope lock(&lock_);
120 RunReadyModules(at_time);
121 UpdateNextRunTime();
122}
123
124void SimulatedSequenceRunner::Delete() {
125 {
126 rtc::CritScope lock(&lock_);
127 ready_tasks_.clear();
128 delayed_tasks_.clear();
129 }
130 delete this;
131}
132
133void SimulatedSequenceRunner::RunReadyTasks(Timestamp at_time) {
134 std::deque<std::unique_ptr<QueuedTask>> ready_tasks;
135 {
136 rtc::CritScope lock(&lock_);
137 ready_tasks.swap(ready_tasks_);
138 }
139 if (!ready_tasks.empty()) {
140 CurrentTaskQueueSetter set_current(this);
141 for (auto& ready : ready_tasks) {
142 bool delete_task = ready->Run();
143 if (delete_task) {
144 ready.reset();
145 } else {
146 ready.release();
147 }
148 }
149 }
150}
151
152void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) {
153 if (!ready_modules_.empty()) {
154 CurrentTaskQueueSetter set_current(this);
155 for (auto* module : ready_modules_) {
156 module->Process();
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100157 delayed_modules_[GetNextTime(module, at_time)].push_back(module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100158 }
159 }
160 ready_modules_.clear();
161}
162
163void SimulatedSequenceRunner::UpdateNextRunTime() {
164 if (!ready_tasks_.empty() || !ready_modules_.empty()) {
165 next_run_time_ = Timestamp::MinusInfinity();
166 } else {
167 next_run_time_ = Timestamp::PlusInfinity();
168 if (!delayed_tasks_.empty())
169 next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
170 if (!delayed_modules_.empty())
171 next_run_time_ =
172 std::min(next_run_time_, delayed_modules_.begin()->first);
173 }
174}
175
176void SimulatedSequenceRunner::PostTask(std::unique_ptr<QueuedTask> task) {
177 rtc::CritScope lock(&lock_);
178 ready_tasks_.emplace_back(std::move(task));
179 next_run_time_ = Timestamp::MinusInfinity();
180}
181
182void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr<QueuedTask> task,
183 uint32_t milliseconds) {
184 rtc::CritScope lock(&lock_);
185 Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds);
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100186 delayed_tasks_[target_time].push_back(std::move(task));
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100187 next_run_time_ = std::min(next_run_time_, target_time);
188}
189
190void SimulatedSequenceRunner::Start() {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100191 std::vector<Module*> starting;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100192 {
193 rtc::CritScope lock(&lock_);
194 if (process_thread_running_)
195 return;
196 process_thread_running_ = true;
197 starting.swap(stopped_modules_);
198 }
199 for (auto& module : starting)
200 module->ProcessThreadAttached(this);
201
202 Timestamp at_time = GetCurrentTime();
203 rtc::CritScope lock(&lock_);
204 for (auto& module : starting)
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100205 delayed_modules_[GetNextTime(module, at_time)].push_back(module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100206 UpdateNextRunTime();
207}
208
209void SimulatedSequenceRunner::Stop() {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100210 std::vector<Module*> stopping;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100211 {
212 rtc::CritScope lock(&lock_);
213 process_thread_running_ = false;
214
215 for (auto* ready : ready_modules_)
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100216 stopped_modules_.push_back(ready);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100217 ready_modules_.clear();
218
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100219 for (auto& delayed : delayed_modules_) {
220 for (auto mod : delayed.second)
221 stopped_modules_.push_back(mod);
222 }
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100223 delayed_modules_.clear();
224
225 stopping = stopped_modules_;
226 }
227 for (auto& module : stopping)
228 module->ProcessThreadAttached(nullptr);
229}
230
231void SimulatedSequenceRunner::WakeUp(Module* module) {
232 rtc::CritScope lock(&lock_);
233 // If we already are planning to run this module as soon as possible, we don't
234 // need to do anything.
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100235 for (auto mod : ready_modules_)
236 if (mod == module)
237 return;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100238
239 for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100240 if (RemoveByValue(it->second, module))
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100241 break;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100242 }
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100243 Timestamp next_time = GetNextTime(module, GetCurrentTime());
244 delayed_modules_[next_time].push_back(module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100245 next_run_time_ = std::min(next_run_time_, next_time);
246}
247
248void SimulatedSequenceRunner::RegisterModule(Module* module,
249 const rtc::Location& from) {
250 module->ProcessThreadAttached(this);
251 rtc::CritScope lock(&lock_);
252 if (!process_thread_running_) {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100253 stopped_modules_.push_back(module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100254 } else {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100255 Timestamp next_time = GetNextTime(module, GetCurrentTime());
256 delayed_modules_[next_time].push_back(module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100257 next_run_time_ = std::min(next_run_time_, next_time);
258 }
259}
260
261void SimulatedSequenceRunner::DeRegisterModule(Module* module) {
262 bool modules_running;
263 {
264 rtc::CritScope lock(&lock_);
265 if (!process_thread_running_) {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100266 RemoveByValue(stopped_modules_, module);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100267 } else {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100268 bool removed = RemoveByValue(ready_modules_, module);
269 if (!removed) {
270 for (auto& pair : delayed_modules_) {
271 if (RemoveByValue(pair.second, module))
272 break;
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100273 }
274 }
275 }
276 modules_running = process_thread_running_;
277 }
278 if (modules_running)
279 module->ProcessThreadAttached(nullptr);
280}
281
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100282Timestamp SimulatedSequenceRunner::GetNextTime(Module* module,
283 Timestamp at_time) {
284 CurrentTaskQueueSetter set_current(this);
285 return at_time + TimeDelta::ms(module->TimeUntilNextProcess());
286}
287
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100288SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time)
289 : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {}
290
291SimulatedTimeControllerImpl::~SimulatedTimeControllerImpl() = default;
292
293std::unique_ptr<TaskQueueBase, TaskQueueDeleter>
294SimulatedTimeControllerImpl::CreateTaskQueue(
295 absl::string_view name,
296 TaskQueueFactory::Priority priority) const {
297 // TODO(srte): Remove the const cast when the interface is made mutable.
298 auto mutable_this = const_cast<SimulatedTimeControllerImpl*>(this);
299 auto task_queue = std::unique_ptr<SimulatedSequenceRunner, TaskQueueDeleter>(
300 new SimulatedSequenceRunner(mutable_this, name));
301 rtc::CritScope lock(&mutable_this->lock_);
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100302 mutable_this->runners_.push_back(task_queue.get());
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100303 return task_queue;
304}
305
306std::unique_ptr<ProcessThread> SimulatedTimeControllerImpl::CreateProcessThread(
307 const char* thread_name) {
308 rtc::CritScope lock(&lock_);
309 auto process_thread =
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200310 std::make_unique<SimulatedSequenceRunner>(this, thread_name);
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100311 runners_.push_back(process_thread.get());
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100312 return process_thread;
313}
314
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100315void SimulatedTimeControllerImpl::YieldExecution() {
316 if (rtc::CurrentThreadId() == thread_id_) {
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100317 TaskQueueBase* yielding_from = TaskQueueBase::Current();
318 // Since we might continue execution on a process thread, we should reset
319 // the thread local task queue reference. This ensures that thread checkers
320 // won't think we are executing on the yielding task queue. It also ensure
321 // that TaskQueueBase::Current() won't return the yielding task queue.
322 SimulatedSequenceRunner::CurrentTaskQueueSetter reset_queue(nullptr);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100323 RTC_DCHECK_RUN_ON(&thread_checker_);
324 // When we yield, we don't want to risk executing further tasks on the
325 // currently executing task queue. If there's a ready task that also yields,
326 // it's added to this set as well and only tasks on the remaining task
327 // queues are executed.
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100328 auto inserted = yielded_.insert(yielding_from);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100329 RTC_DCHECK(inserted.second);
330 RunReadyRunners();
331 yielded_.erase(inserted.first);
332 }
333}
334
335void SimulatedTimeControllerImpl::RunReadyRunners() {
336 RTC_DCHECK_RUN_ON(&thread_checker_);
Sebastian Jansson1b408232019-04-12 15:39:38 +0200337 rtc::CritScope lock(&lock_);
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100338 RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100339 Timestamp current_time = CurrentTime();
Sebastian Jansson1b408232019-04-12 15:39:38 +0200340 // Clearing |ready_runners_| in case this is a recursive call:
341 // RunReadyRunners -> Run -> Event::Wait -> Yield ->RunReadyRunners
342 ready_runners_.clear();
343
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100344 // We repeat until we have no ready left to handle tasks posted by ready
345 // runners.
346 while (true) {
Sebastian Jansson76540812019-04-11 17:48:30 +0200347 for (auto* runner : runners_) {
348 if (yielded_.find(runner) == yielded_.end() &&
349 runner->GetNextRunTime() <= current_time) {
350 ready_runners_.push_back(runner);
351 }
352 }
353 if (ready_runners_.empty())
354 return;
355 while (!ready_runners_.empty()) {
356 auto* runner = ready_runners_.front();
357 ready_runners_.pop_front();
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100358 runner->UpdateReady(current_time);
Sebastian Jansson76540812019-04-11 17:48:30 +0200359 // Note that the Run function might indirectly cause a call to
360 // Unregister() which will recursively grab |lock_| again to remove items
361 // from |ready_runners_|.
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100362 runner->Run(current_time);
363 }
364 }
365}
366
367Timestamp SimulatedTimeControllerImpl::CurrentTime() const {
368 rtc::CritScope lock(&time_lock_);
369 return current_time_;
370}
371
372Timestamp SimulatedTimeControllerImpl::NextRunTime() const {
373 Timestamp current_time = CurrentTime();
374 Timestamp next_time = Timestamp::PlusInfinity();
375 rtc::CritScope lock(&lock_);
376 for (auto* runner : runners_) {
377 Timestamp next_run_time = runner->GetNextRunTime();
378 if (next_run_time <= current_time)
379 return current_time;
380 next_time = std::min(next_time, next_run_time);
381 }
382 return next_time;
383}
384
385void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) {
386 rtc::CritScope time_lock(&time_lock_);
Sebastian Jansson5a000162019-04-12 11:21:32 +0200387 RTC_DCHECK_GE(target_time, current_time_);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100388 current_time_ = target_time;
389}
390
391void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) {
392 rtc::CritScope lock(&lock_);
Sebastian Jansson7b6add32019-03-29 10:34:26 +0100393 bool removed = RemoveByValue(runners_, runner);
394 RTC_CHECK(removed);
Sebastian Jansson76540812019-04-11 17:48:30 +0200395 RemoveByValue(ready_runners_, runner);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100396}
397
398} // namespace sim_time_impl
399
400GlobalSimulatedTimeController::GlobalSimulatedTimeController(
401 Timestamp start_time)
402 : sim_clock_(start_time.us()), impl_(start_time) {
Sebastian Janssond624c392019-04-17 10:36:03 +0200403 global_clock_.SetTime(start_time);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100404}
405
406GlobalSimulatedTimeController::~GlobalSimulatedTimeController() = default;
407
408Clock* GlobalSimulatedTimeController::GetClock() {
409 return &sim_clock_;
410}
411
412TaskQueueFactory* GlobalSimulatedTimeController::GetTaskQueueFactory() {
413 return &impl_;
414}
415
416std::unique_ptr<ProcessThread>
417GlobalSimulatedTimeController::CreateProcessThread(const char* thread_name) {
418 return impl_.CreateProcessThread(thread_name);
419}
420
421void GlobalSimulatedTimeController::Sleep(TimeDelta duration) {
422 rtc::ScopedYieldPolicy yield_policy(&impl_);
423 Timestamp current_time = impl_.CurrentTime();
424 Timestamp target_time = current_time + duration;
425 RTC_DCHECK_EQ(current_time.us(), rtc::TimeMicros());
426 while (current_time < target_time) {
427 impl_.RunReadyRunners();
428 Timestamp next_time = std::min(impl_.NextRunTime(), target_time);
429 impl_.AdvanceTime(next_time);
430 auto delta = next_time - current_time;
431 current_time = next_time;
432 sim_clock_.AdvanceTimeMicroseconds(delta.us());
Sebastian Janssond624c392019-04-17 10:36:03 +0200433 global_clock_.AdvanceTime(delta);
Sebastian Jansson0d617cc2019-03-22 15:22:16 +0100434 }
435}
436
437void GlobalSimulatedTimeController::InvokeWithControlledYield(
438 std::function<void()> closure) {
439 rtc::ScopedYieldPolicy yield_policy(&impl_);
440 closure();
441}
442
443// namespace sim_time_impl
444
445} // namespace webrtc