roo_scheduler
API Documentation for roo_scheduler
Loading...
Searching...
No Matches
roo_scheduler.cpp
Go to the documentation of this file.
1#include "roo_scheduler.h"
2
3#include <algorithm>
4
5namespace roo_scheduler {
6
7Scheduler::Scheduler() : queue_(), next_execution_id_(0), canceled_(0) {}
8
9ExecutionID Scheduler::scheduleOn(roo_time::Uptime when, Executable& task,
10 Priority priority) {
11 roo::lock_guard<roo::mutex> lock(mutex_);
12 nonempty_.notify_all();
13 return push(when, &task, false, priority);
14}
15
16ExecutionID Scheduler::scheduleOn(roo_time::Uptime when,
17 std::unique_ptr<Executable> task,
18 Priority priority) {
19 roo::lock_guard<roo::mutex> lock(mutex_);
20 nonempty_.notify_all();
21 return push(when, task.release(), true, priority);
22}
23
24ExecutionID Scheduler::scheduleOn(roo_time::Uptime when,
25 std::function<void()> task,
26 Priority priority) {
27 roo::lock_guard<roo::mutex> lock(mutex_);
28 nonempty_.notify_all();
29 return push(when, new Task(std::move(task)), true, priority);
30}
31
32ExecutionID Scheduler::scheduleAfter(roo_time::Duration delay, Executable& task,
33 Priority priority) {
34 roo::lock_guard<roo::mutex> lock(mutex_);
35 nonempty_.notify_all();
36 return push(roo_time::Uptime::Now() + delay, &task, false, priority);
37}
38
39ExecutionID Scheduler::scheduleAfter(roo_time::Duration delay,
40 std::unique_ptr<Executable> task,
41 Priority priority) {
42 roo::lock_guard<roo::mutex> lock(mutex_);
43 nonempty_.notify_all();
44 return push(roo_time::Uptime::Now() + delay, task.release(), true, priority);
45}
46
47ExecutionID Scheduler::scheduleAfter(roo_time::Duration delay,
48 std::function<void()> task,
49 Priority priority) {
50 roo::lock_guard<roo::mutex> lock(mutex_);
51 nonempty_.notify_all();
52 return push(roo_time::Uptime::Now() + delay, new Task(std::move(task)), true,
53 priority);
54}
55
56ExecutionID Scheduler::push(roo_time::Uptime when, Executable* task,
57 bool owns_task, Priority priority) {
58 ExecutionID id = next_execution_id_;
59 ++next_execution_id_;
60 // Reserve negative IDs for special use.
61 next_execution_id_ &= 0x7FFFFFFF;
62 queue_.emplace_back(id, task, owns_task, when, priority);
63 std::push_heap(queue_.begin(), queue_.end(), TimeComparator());
64 return id;
65}
66
67// The queue must be non-empty.
68void Scheduler::pop() {
69 std::pop_heap(queue_.begin(), queue_.end(), TimeComparator());
70 queue_.pop_back();
71 // Fix the possibly broken invariant - get a non-cancelled task, if exists, at
72 // the top of the queue.
73 while (!canceled_.empty()) {
74 if (queue_.empty()) {
75 canceled_.clear();
76 return;
77 }
78 ExecutionID id = queue_.front().id();
79 if (!canceled_.erase(id)) return;
80 std::pop_heap(queue_.begin(), queue_.end(), TimeComparator());
81 queue_.pop_back();
82 }
83}
84
85bool Scheduler::executeEligibleTasksUpTo(roo_time::Uptime deadline,
86 Priority min_priority, int max_tasks) {
87 while (max_tasks < 0 || max_tasks-- > 0) {
88 if (!runOneEligibleExecution(deadline, min_priority)) return true;
89 }
90 return false;
91}
92
93bool Scheduler::executeEligibleTasks(Priority min_priority, int max_tasks) {
94 while (max_tasks < 0 || max_tasks-- > 0) {
95 if (!runOneEligibleExecution(roo_time::Uptime::Now(), min_priority))
96 return true;
97 }
98 return false;
99}
100
101roo_time::Uptime Scheduler::getNearestExecutionTime() const {
102 roo::lock_guard<roo::mutex> lock(mutex_);
103 return getNearestExecutionTimeWithLockHeld();
104}
105
106roo_time::Uptime Scheduler::getNearestExecutionTimeWithLockHeld() const {
107#if !ROO_SCHEDULER_IGNORE_PRIORITY
108 if (!ready_.empty()) {
109 return roo_time::Uptime::Now();
110 }
111#endif
112 if (!queue_.empty()) {
113 return queue_.front().when();
114 }
115 return roo_time::Uptime::Max();
116}
117
118roo_time::Duration Scheduler::getNearestExecutionDelay() const {
119 roo::lock_guard<roo::mutex> lock(mutex_);
120 return getNearestExecutionDelayWithLockHeld();
121}
122
123roo_time::Duration Scheduler::getNearestExecutionDelayWithLockHeld() const {
124#if !ROO_SCHEDULER_IGNORE_PRIORITY
125 if (!ready_.empty()) {
126 return roo_time::Duration();
127 }
128#endif
129 if (!queue_.empty()) {
130 roo_time::Uptime next = queue_.front().when();
131 roo_time::Uptime now = roo_time::Uptime::Now();
132 return (next < now ? roo_time::Duration() : next - now);
133 }
134 return roo_time::Duration::Max();
135}
136
137#if !ROO_SCHEDULER_IGNORE_PRIORITY
138bool Scheduler::runOneEligibleExecution(roo_time::Uptime deadline,
139 Priority min_priority) {
140 roo_time::Uptime now = roo_time::Uptime::Now();
141 if (deadline > now) deadline = now;
142 Entry to_execute;
143 {
144 roo::lock_guard<roo::mutex> lock(mutex_);
145 // Move all due tasks to the ready queue.
146 while (!queue_.empty() && queue_.front().when() <= deadline) {
147 ready_.push_back(std::move(queue_.front()));
148 std::push_heap(ready_.begin(), ready_.end(), PriorityComparator());
149 pop();
150 }
151 while (!ready_.empty()) {
152 Entry& entry = ready_.front();
153 bool canceled = canceled_.erase(entry.id());
154 if (!canceled) {
155 if (entry.priority() < min_priority) {
156 // Next ready task is too low priority.
157 return false;
158 }
159 to_execute = std::move(entry);
160 }
161 std::pop_heap(ready_.begin(), ready_.end(), PriorityComparator());
162 ready_.pop_back();
163 if (to_execute.task() != nullptr) {
164 // Found an eligible task (not canceled, with high enough priority).
165 break;
166 }
167 }
168 }
169 if (to_execute.task() == nullptr) {
170 // No ready tasks.
171 return false;
172 }
173 to_execute.task()->execute(to_execute.id());
174 return true;
175}
176#else
177bool Scheduler::runOneEligibleExecution(roo_time::Uptime deadline,
178 Priority min_priority) {
179 roo_time::Uptime now = roo_time::Uptime::Now();
180 if (deadline > now) deadline = now;
181 Entry to_execute;
182 {
183 roo::lock_guard<roo::mutex> lock(mutex_);
184 // Process all due tasks.
185 while (!queue_.empty() && queue_.front().when() <= deadline) {
186 Entry& entry = queue_.front();
187 // ExecutionID id = entry.id();
188 bool canceled = canceled_.erase(entry.id());
189 if (!canceled) {
190 if (entry.priority() < min_priority) {
191 // Next ready task is too low priority.
192 return false;
193 }
194 to_execute = std::move(entry);
195 }
196 pop();
197 if (to_execute.task() != nullptr) {
198 // Found an eligible task (not canceled, with high enough priority).
199 break;
200 }
201 }
202 }
203 if (to_execute.task() == nullptr) {
204 // No ready tasks.
205 return false;
206 }
207 to_execute.task()->execute(to_execute.id());
208 return true;
209}
210#endif
211
213 roo::lock_guard<roo::mutex> lock(mutex_);
214 if (queue_.empty()) {
215 // There is nothing to cancel.
216 return;
217 }
218 // Opportunistically check if the scheduled run is at the top of the queue and
219 // can be immediately removed.
220 if (queue_.front().id() == id) {
221 // Found, indeed!
222 pop();
223 return;
224 }
225 // The task might be scheduled behind others; need to defer cancellation.
226 canceled_.insert(id);
227}
228
230 roo::lock_guard<roo::mutex> lock(mutex_);
231 if (canceled_.empty()) return;
232 bool modified = false;
233 size_t i = 0;
234 while (i < queue_.size()) {
235 if (canceled_.erase(queue_[i].id())) {
236 modified = true;
237 queue_[i] = std::move(queue_.back());
238 queue_.pop_back();
239 } else {
240 ++i;
241 }
242 if (canceled_.empty()) break;
243 }
244 // Clear the canceled set, on the off chance that it contained any IDs that
245 // were not actually found in the queue at all.
246 canceled_.clear();
247 if (modified) {
248 std::make_heap(queue_.begin(), queue_.end(), TimeComparator());
249 }
250}
251
252void Scheduler::delay(roo_time::Duration delay, Priority min_priority) {
253 delayUntil(roo_time::Uptime::Now() + delay, min_priority);
254}
255
256void Scheduler::delayUntil(roo_time::Uptime deadline, Priority min_priority) {
257 while (roo_time::Uptime::Now() < deadline) {
258 if (executeEligibleTasks(1)) {
259 roo::unique_lock<roo::mutex> lock(mutex_);
260 roo_time::Uptime next = getNearestExecutionTimeWithLockHeld();
261 if (next > deadline) next = deadline;
262 auto now = roo_time::Uptime::Now();
263 if (next > now) {
264 nonempty_.wait_until(lock, next);
265 }
266 }
267 }
268 executeEligibleTasksUpTo(deadline, min_priority);
269}
270
272 while (true) {
274 {
275 roo::unique_lock<roo::mutex> lock(mutex_);
276 roo_time::Duration delay = getNearestExecutionDelayWithLockHeld();
277 if (delay > roo_time::Duration()) {
278 if (delay == roo_time::Duration::Max()) {
279 nonempty_.wait(lock);
280 } else {
281 nonempty_.wait_for(lock, delay);
282 }
283 }
284 }
285 }
286}
287
288RepetitiveTask::RepetitiveTask(Scheduler& scheduler, roo_time::Duration delay,
289 std::function<void()> task, Priority priority)
290 : scheduler_(scheduler),
291 task_(task),
292 id_(-1),
293 active_(false),
294 priority_(priority),
295 delay_(delay) {}
296
297// Starts the task, scheduling the next execution after the specified delay.
298bool RepetitiveTask::start(roo_time::Duration initial_delay) {
299 if (active_) return false;
300 if (id_ >= 0) scheduler_.cancel(id_);
301 active_ = true;
302 id_ = scheduler_.scheduleAfter(initial_delay, *this, priority_);
303 return true;
304}
305
307 if (!active_) return false;
308 active_ = false;
309 return true;
310}
311
313 if (id != id_ || !active_) return;
314 task_();
315 if (!active_) return;
316 id_ = scheduler_.scheduleAfter(delay_, *this, priority_);
317}
318
320 if (id_ >= 0) scheduler_.cancel(id_);
321}
322
323PeriodicTask::PeriodicTask(Scheduler& scheduler, roo_time::Duration period,
324 std::function<void()> task, Priority priority)
325 : scheduler_(scheduler),
326 task_(task),
327 id_(-1),
328 active_(false),
329 priority_(priority),
330 period_(period) {}
331
332bool PeriodicTask::start(roo_time::Uptime when) {
333 if (active_) return false;
334 if (id_ >= 0) scheduler_.cancel(id_);
335 active_ = true;
336 next_ = when;
337 id_ = scheduler_.scheduleOn(next_, *this, priority_);
338 return true;
339}
340
342 if (!active_) return false;
343 active_ = false;
344 return true;
345}
346
348 if (id != id_ || !active_) return;
349 task_();
350 next_ += period_;
351 if (!active_) return;
352 id_ = scheduler_.scheduleOn(next_, *this, priority_);
353}
354
356 if (id_ >= 0) scheduler_.cancel(id_);
357}
358
359SingletonTask::SingletonTask(Scheduler& scheduler, std::function<void()> task)
360 : scheduler_(scheduler), task_(task), id_(-1), scheduled_(false) {}
361
362void SingletonTask::scheduleOn(roo_time::Uptime when, Priority priority) {
363 if (scheduled_) scheduler_.cancel(id_);
364 id_ = scheduler_.scheduleOn(when, *this, priority);
365 scheduled_ = true;
366}
367
368void SingletonTask::scheduleAfter(roo_time::Duration delay, Priority priority) {
369 if (scheduled_) scheduler_.cancel(id_);
370 id_ = scheduler_.scheduleAfter(delay, *this, priority);
371 scheduled_ = true;
372}
373
375 if (scheduled_) scheduler_.cancel(id_);
376 id_ = scheduler_.scheduleNow(*this, priority);
377 scheduled_ = true;
378}
379
381 if (!scheduled_ || id != id_) return;
382 scheduled_ = false;
383 id_ = -1;
384 task_();
385}
386
388 if (id_ >= 0) scheduler_.cancel(id_);
389}
390
392 std::function<void()> done_cb)
393 : scheduler_(scheduler), itr_(iterator), id_(-1), done_cb_(done_cb) {}
394
395bool IteratingTask::start(roo_time::Uptime when) {
396 if (is_active()) return false;
397 id_ = scheduler_.scheduleOn(when, *this);
398 return true;
399}
400
402 int64_t next_delay_us = itr_.next();
403 if (next_delay_us >= 0) {
404 id_ = scheduler_.scheduleAfter(roo_time::Micros(next_delay_us), *this);
405 } else {
406 id_ = -1;
407 // This is the last thing we do, so that if the callback invokes our
408 // destructor, that's OK. (That said, the callback should also do so at
409 // the very end, because the callback is also destructing itself this
410 // way).
411 done_cb_();
412 }
413}
414
416 if (id_ >= 0) scheduler_.cancel(id_);
417}
418
419} // namespace roo_scheduler
Abstract interface for executable tasks in the scheduler queue.
void execute(ExecutionID id) override
bool start(roo_time::Uptime when=roo_time::Uptime::Now())
IteratingTask(Scheduler &scheduler, Iterator &iterator, std::function< void()> done_cb=std::function< void()>())
void execute(ExecutionID id) override
bool start(roo_time::Uptime when=roo_time::Uptime::Now())
PeriodicTask(Scheduler &scheduler, roo_time::Duration period, std::function< void()> task, Priority priority=Priority::kNormal)
bool start()
Starts task using configured periodic delay.
void execute(ExecutionID id) override
RepetitiveTask(Scheduler &scheduler, roo_time::Duration delay, std::function< void()> task, Priority priority=Priority::kNormal)
Schedules and dispatches delayed task executions.
bool executeEligibleTasksUpTo(roo_time::Uptime deadline, Priority min_priority=Priority::kMinimum, int max_count=-1)
Executes up to max_count eligible tasks due no later than deadline.
Scheduler()
Creates an empty scheduler.
ExecutionID scheduleAfter(roo_time::Duration delay, Executable &task, Priority priority=Priority::kNormal)
Schedules execution after delay elapses.
void cancel(ExecutionID)
Marks execution identified by id as canceled.
void run()
Runs scheduler event loop forever.
ExecutionID scheduleNow(Executable &task, Priority priority=Priority::kNormal)
Schedules execution as soon as possible.
void delay(roo_time::Duration delay, Priority min_priority=Priority::kNormal)
Delays for at least delay while executing scheduled work.
void delayUntil(roo_time::Uptime deadline, Priority min_priority=Priority::kNormal)
Delays until deadline while executing scheduled work.
roo_time::Uptime getNearestExecutionTime() const
Returns due time of the nearest upcoming execution.
roo_time::Duration getNearestExecutionDelay() const
Returns delay to the nearest upcoming execution.
void pruneCanceled()
Removes canceled executions from the queue.
ExecutionID scheduleOn(roo_time::Uptime when, Executable &task, Priority priority=Priority::kNormal)
Schedules execution no earlier than when.
bool executeEligibleTasks(Priority min_priority, int max_count=-1)
Executes up to max_count eligible tasks with at least min_priority.
void scheduleAfter(roo_time::Duration delay, Priority priority=Priority::kNormal)
Schedules or reschedules task after delay.
void scheduleNow(Priority priority=Priority::kNormal)
Schedules or reschedules task for immediate execution.
void execute(ExecutionID id) override
SingletonTask(Scheduler &scheduler, std::function< void()> task)
void scheduleOn(roo_time::Uptime when, Priority priority=Priority::kNormal)
Schedules or reschedules task at absolute time when.
Convenience adapter for one-time execution of an arbitrary callable.
Priority
Priority controls dispatch order among eligible tasks.
int32_t ExecutionID
Represents a unique task execution identifier.