blob: 8425eedbb81a75600adfb7810cab3737d4a97d25 [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
btolschc92ba2f2019-04-10 11:46:01 -07005#ifndef PLATFORM_BASE_TASK_RUNNER_IMPL_H_
6#define PLATFORM_BASE_TASK_RUNNER_IMPL_H_
Jordan Baylesb0c191e2019-03-26 15:49:57 -07007
8#include <atomic>
9#include <condition_variable> // NOLINT
10#include <deque>
11#include <functional>
12#include <memory>
13#include <mutex> // NOLINT
14#include <queue>
15#include <thread> // NOLINT
16#include <utility>
17#include <vector>
18
19#include "absl/base/thread_annotations.h"
20#include "absl/types/optional.h"
btolschd94fe622019-05-09 14:21:40 -070021#include "osp_base/error.h"
btolschc92ba2f2019-04-10 11:46:01 -070022#include "platform/api/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -070023#include "platform/api/time.h"
24
25namespace openscreen {
26namespace platform {
27
28class TaskRunnerImpl : public TaskRunner {
29 public:
btolschd94fe622019-05-09 14:21:40 -070030 class TaskWaiter {
31 public:
32 virtual ~TaskWaiter() = default;
33
34 // These calls should be thread-safe. The absolute minimum is that
35 // OnTaskPosted must be safe to call from another thread while this is
36 // inside WaitForTaskToBePosted. NOTE: There may be spurious wakeups from
37 // WaitForTaskToBePosted depending on whether the specific implementation
38 // chooses to clear queued WakeUps before entering WaitForTaskToBePosted.
39
40 // Blocks until some event occurs, which means new tasks may have been
41 // posted. Wait may only block up to |timeout| where 0 means don't block at
42 // all (not block forever).
43 virtual Error WaitForTaskToBePosted(Clock::duration timeout) = 0;
44
45 // If a WaitForTaskToBePosted call is currently blocking, unblock it
46 // immediately.
47 virtual void OnTaskPosted() = 0;
48 };
49
50 explicit TaskRunnerImpl(
51 platform::ClockNowFunctionPtr now_function,
52 TaskWaiter* event_waiter = nullptr,
53 Clock::duration waiter_timeout = std::chrono::milliseconds(100));
Jordan Baylesb0c191e2019-03-26 15:49:57 -070054
55 // TaskRunner overrides
56 ~TaskRunnerImpl() override;
57 void PostTask(Task task) override;
58 void PostTaskWithDelay(Task task, Clock::duration delay) override;
59
Jordan Baylesa8e96772019-04-08 10:53:54 -070060 // Tasks will only be executed if RunUntilStopped has been called, and
61 // RequestStopSoon has not. Important note: TaskRunnerImpl does NOT do any
62 // threading, so calling "RunUntilStopped()" will block whatever thread you
63 // are calling it on.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070064 void RunUntilStopped();
Jordan Baylesa8e96772019-04-08 10:53:54 -070065
66 // Thread-safe method for requesting the TaskRunnerImpl to stop running. This
67 // sets a flag that will get checked in the run loop, typically after
68 // completing the current task.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070069 void RequestStopSoon();
70
71 // Execute all tasks immediately, useful for testing only. Note: this method
72 // will schedule any delayed tasks that are ready to run, but does not block
73 // waiting for delayed tasks to become eligible.
74 void RunUntilIdleForTesting();
75
76 private:
77 struct DelayedTask {
Jordan Baylesa8e96772019-04-08 10:53:54 -070078 DelayedTask(Task task_, Clock::time_point runnable_after_)
79 : task(std::move(task_)), runnable_after(runnable_after_) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070080
81 Task task;
Jordan Baylesa8e96772019-04-08 10:53:54 -070082 Clock::time_point runnable_after;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070083
84 bool operator>(const DelayedTask& other) const {
Jordan Baylesa8e96772019-04-08 10:53:54 -070085 return this->runnable_after > other.runnable_after;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070086 }
87 };
88
Jordan Baylesa8e96772019-04-08 10:53:54 -070089 // Run all tasks already in the task queue. If the queue is empty, wait for
90 // either (1) a delayed task to become available, or (2) a task to be added
91 // to the queue.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070092 void RunCurrentTasksBlocking();
Jordan Baylesa8e96772019-04-08 10:53:54 -070093
94 // Run tasks already in the queue, for testing. If the queue is empty, this
95 // method does not block but instead returns immediately.
Jordan Baylesb0c191e2019-03-26 15:49:57 -070096 void RunCurrentTasksForTesting();
97
Jordan Baylesa8e96772019-04-08 10:53:54 -070098 // Loop method that runs tasks in the current thread, until the
99 // RequestStopSoon method is called.
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700100 void RunTasksUntilStopped();
101
102 // Look at all tasks in the delayed task queue, then schedule them if the
103 // minimum delay time has elapsed.
104 void ScheduleDelayedTasks();
105
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700106 // Look at the current state of the TaskRunner and determine if the run loop
107 // should be woken up
108 bool ShouldWakeUpRunLoop();
109
Jordan Baylesa8e96772019-04-08 10:53:54 -0700110 // Takes the task_mutex_ lock, returning immediately if work is available. If
111 // no work is available, this places the task running thread into a waiting
112 // state until we stop running or work is available.
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700113 std::unique_lock<std::mutex> WaitForWorkAndAcquireLock();
114
115 const platform::ClockNowFunctionPtr now_function_;
116
117 // Atomic so that we can perform atomic exchanges.
118 std::atomic_bool is_running_;
119
120 // This mutex is used for both tasks_ and notifying the run loop to wake
121 // up when it is waiting for a task to be added to the queue in
122 // run_loop_wakeup_.
123 std::mutex task_mutex_;
124 std::priority_queue<DelayedTask,
125 std::vector<DelayedTask>,
126 std::greater<DelayedTask>>
127 delayed_tasks_ GUARDED_BY(task_mutex_);
128
btolschd94fe622019-05-09 14:21:40 -0700129 // When |task_waiter_| is nullptr, |run_loop_wakeup_| is used for sleeping the
130 // task runner. Otherwise, |run_loop_wakeup_| isn't used and |task_waiter_|
131 // is used instead (along with |waiter_timeout_|).
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700132 std::condition_variable run_loop_wakeup_;
btolschd94fe622019-05-09 14:21:40 -0700133 TaskWaiter* const task_waiter_;
134 Clock::duration waiter_timeout_;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700135 std::deque<Task> tasks_ GUARDED_BY(task_mutex_);
136};
137} // namespace platform
138} // namespace openscreen
139
btolschc92ba2f2019-04-10 11:46:01 -0700140#endif // PLATFORM_BASE_TASK_RUNNER_IMPL_H_