blob: c8be03f0a74e3576cc9aa7dc1f09fd952f20aa4f [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef API_IMPL_TASK_RUNNER_IMPL_H_
6#define API_IMPL_TASK_RUNNER_IMPL_H_
7
8#include <atomic>
9#include <condition_variable> // NOLINT
10#include <deque>
11#include <functional>
12#include <memory>
13#include <mutex> // NOLINT
14#include <queue>
15#include <thread> // NOLINT
16#include <utility>
17#include <vector>
18
19#include "absl/base/thread_annotations.h"
20#include "absl/types/optional.h"
21#include "api/public/task_runner.h"
22#include "platform/api/time.h"
23
24namespace openscreen {
25namespace platform {
26
27class TaskRunnerImpl : public TaskRunner {
28 public:
29 explicit TaskRunnerImpl(platform::ClockNowFunctionPtr now_function);
30
31 // TaskRunner overrides
32 ~TaskRunnerImpl() override;
33 void PostTask(Task task) override;
34 void PostTaskWithDelay(Task task, Clock::duration delay) override;
35
Jordan Baylesa8e96772019-04-08 10:53:54 -070036 // Tasks will only be executed if RunUntilStopped has been called, and
37 // RequestStopSoon has not. Important note: TaskRunnerImpl does NOT do any
38 // threading, so calling "RunUntilStopped()" will block whatever thread you
39 // are calling it on.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040 void RunUntilStopped();
Jordan Baylesa8e96772019-04-08 10:53:54 -070041
42 // Thread-safe method for requesting the TaskRunnerImpl to stop running. This
43 // sets a flag that will get checked in the run loop, typically after
44 // completing the current task.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070045 void RequestStopSoon();
46
47 // Execute all tasks immediately, useful for testing only. Note: this method
48 // will schedule any delayed tasks that are ready to run, but does not block
49 // waiting for delayed tasks to become eligible.
50 void RunUntilIdleForTesting();
51
52 private:
53 struct DelayedTask {
Jordan Baylesa8e96772019-04-08 10:53:54 -070054 DelayedTask(Task task_, Clock::time_point runnable_after_)
55 : task(std::move(task_)), runnable_after(runnable_after_) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070056
57 Task task;
Jordan Baylesa8e96772019-04-08 10:53:54 -070058 Clock::time_point runnable_after;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070059
60 bool operator>(const DelayedTask& other) const {
Jordan Baylesa8e96772019-04-08 10:53:54 -070061 return this->runnable_after > other.runnable_after;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070062 }
63 };
64
Jordan Baylesa8e96772019-04-08 10:53:54 -070065 // Run all tasks already in the task queue. If the queue is empty, wait for
66 // either (1) a delayed task to become available, or (2) a task to be added
67 // to the queue.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070068 void RunCurrentTasksBlocking();
Jordan Baylesa8e96772019-04-08 10:53:54 -070069
70 // Run tasks already in the queue, for testing. If the queue is empty, this
71 // method does not block but instead returns immediately.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070072 void RunCurrentTasksForTesting();
73
Jordan Baylesa8e96772019-04-08 10:53:54 -070074 // Loop method that runs tasks in the current thread, until the
75 // RequestStopSoon method is called.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070076 void RunTasksUntilStopped();
77
78 // Look at all tasks in the delayed task queue, then schedule them if the
79 // minimum delay time has elapsed.
80 void ScheduleDelayedTasks();
81
Jordan Bayles5d72bc22019-04-09 13:33:52 -070082 // Look at the current state of the TaskRunner and determine if the run loop
83 // should be woken up
84 bool ShouldWakeUpRunLoop();
85
Jordan Baylesa8e96772019-04-08 10:53:54 -070086 // Takes the task_mutex_ lock, returning immediately if work is available. If
87 // no work is available, this places the task running thread into a waiting
88 // state until we stop running or work is available.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070089 std::unique_lock<std::mutex> WaitForWorkAndAcquireLock();
90
91 const platform::ClockNowFunctionPtr now_function_;
92
93 // Atomic so that we can perform atomic exchanges.
94 std::atomic_bool is_running_;
95
96 // This mutex is used for both tasks_ and notifying the run loop to wake
97 // up when it is waiting for a task to be added to the queue in
98 // run_loop_wakeup_.
99 std::mutex task_mutex_;
100 std::priority_queue<DelayedTask,
101 std::vector<DelayedTask>,
102 std::greater<DelayedTask>>
103 delayed_tasks_ GUARDED_BY(task_mutex_);
104
105 std::condition_variable run_loop_wakeup_;
106 std::deque<Task> tasks_ GUARDED_BY(task_mutex_);
107};
108} // namespace platform
109} // namespace openscreen
110
111#endif // API_IMPL_TASK_RUNNER_IMPL_H_