11 roo::lock_guard<roo::mutex> lock(mutex_);
12 nonempty_.notify_all();
13 return push(when, &task,
false, priority);
17 std::unique_ptr<Executable> task,
19 roo::lock_guard<roo::mutex> lock(mutex_);
20 nonempty_.notify_all();
21 return push(when, task.release(),
true, priority);
25 std::function<
void()> task,
27 roo::lock_guard<roo::mutex> lock(mutex_);
28 nonempty_.notify_all();
29 return push(when,
new Task(std::move(task)),
true, priority);
34 roo::lock_guard<roo::mutex> lock(mutex_);
35 nonempty_.notify_all();
36 return push(roo_time::Uptime::Now() +
delay, &task,
false, priority);
40 std::unique_ptr<Executable> task,
42 roo::lock_guard<roo::mutex> lock(mutex_);
43 nonempty_.notify_all();
44 return push(roo_time::Uptime::Now() +
delay, task.release(),
true, priority);
48 std::function<
void()> task,
50 roo::lock_guard<roo::mutex> lock(mutex_);
51 nonempty_.notify_all();
52 return push(roo_time::Uptime::Now() +
delay,
new Task(std::move(task)),
true,
61 next_execution_id_ &= 0x7FFFFFFF;
62 queue_.emplace_back(
id, task, owns_task, when, priority);
63 std::push_heap(queue_.begin(), queue_.end(), TimeComparator());
68void Scheduler::pop() {
69 std::pop_heap(queue_.begin(), queue_.end(), TimeComparator());
73 while (!canceled_.empty()) {
79 if (!canceled_.erase(
id))
return;
80 std::pop_heap(queue_.begin(), queue_.end(), TimeComparator());
86 Priority min_priority,
int max_tasks) {
87 while (max_tasks < 0 || max_tasks-- > 0) {
88 if (!runOneEligibleExecution(deadline, min_priority))
return true;
94 while (max_tasks < 0 || max_tasks-- > 0) {
95 if (!runOneEligibleExecution(roo_time::Uptime::Now(), min_priority))
102 roo::lock_guard<roo::mutex> lock(mutex_);
103 return getNearestExecutionTimeWithLockHeld();
106roo_time::Uptime Scheduler::getNearestExecutionTimeWithLockHeld()
const {
107#if !ROO_SCHEDULER_IGNORE_PRIORITY
108 if (!ready_.empty()) {
109 return roo_time::Uptime::Now();
112 if (!queue_.empty()) {
113 return queue_.front().when();
115 return roo_time::Uptime::Max();
119 roo::lock_guard<roo::mutex> lock(mutex_);
120 return getNearestExecutionDelayWithLockHeld();
123roo_time::Duration Scheduler::getNearestExecutionDelayWithLockHeld()
const {
124#if !ROO_SCHEDULER_IGNORE_PRIORITY
125 if (!ready_.empty()) {
126 return roo_time::Duration();
129 if (!queue_.empty()) {
130 roo_time::Uptime next = queue_.front().when();
131 roo_time::Uptime now = roo_time::Uptime::Now();
132 return (next < now ? roo_time::Duration() : next - now);
134 return roo_time::Duration::Max();
137#if !ROO_SCHEDULER_IGNORE_PRIORITY
138bool Scheduler::runOneEligibleExecution(roo_time::Uptime deadline,
140 roo_time::Uptime now = roo_time::Uptime::Now();
141 if (deadline > now) deadline = now;
144 roo::lock_guard<roo::mutex> lock(mutex_);
146 while (!queue_.empty() && queue_.front().when() <= deadline) {
147 ready_.push_back(std::move(queue_.front()));
148 std::push_heap(ready_.begin(), ready_.end(), PriorityComparator());
151 while (!ready_.empty()) {
152 Entry& entry = ready_.front();
153 bool canceled = canceled_.erase(entry.id());
155 if (entry.priority() < min_priority) {
159 to_execute = std::move(entry);
161 std::pop_heap(ready_.begin(), ready_.end(), PriorityComparator());
163 if (to_execute.task() !=
nullptr) {
169 if (to_execute.task() ==
nullptr) {
173 to_execute.task()->execute(to_execute.id());
177bool Scheduler::runOneEligibleExecution(roo_time::Uptime deadline,
179 roo_time::Uptime now = roo_time::Uptime::Now();
180 if (deadline > now) deadline = now;
183 roo::lock_guard<roo::mutex> lock(mutex_);
185 while (!queue_.empty() && queue_.front().when() <= deadline) {
186 Entry& entry = queue_.front();
188 bool canceled = canceled_.erase(entry.id());
190 if (entry.priority() < min_priority) {
194 to_execute = std::move(entry);
197 if (to_execute.task() !=
nullptr) {
203 if (to_execute.task() ==
nullptr) {
207 to_execute.task()->execute(to_execute.id());
213 roo::lock_guard<roo::mutex> lock(mutex_);
214 if (queue_.empty()) {
220 if (queue_.front().id() ==
id) {
226 canceled_.insert(
id);
230 roo::lock_guard<roo::mutex> lock(mutex_);
231 if (canceled_.empty())
return;
232 bool modified =
false;
234 while (i < queue_.size()) {
235 if (canceled_.erase(queue_[i].id())) {
237 queue_[i] = std::move(queue_.back());
242 if (canceled_.empty())
break;
248 std::make_heap(queue_.begin(), queue_.end(), TimeComparator());
257 while (roo_time::Uptime::Now() < deadline) {
259 roo::unique_lock<roo::mutex> lock(mutex_);
260 roo_time::Uptime next = getNearestExecutionTimeWithLockHeld();
261 if (next > deadline) next = deadline;
262 auto now = roo_time::Uptime::Now();
264 nonempty_.wait_until(lock, next);
275 roo::unique_lock<roo::mutex> lock(mutex_);
276 roo_time::Duration
delay = getNearestExecutionDelayWithLockHeld();
277 if (
delay > roo_time::Duration()) {
278 if (
delay == roo_time::Duration::Max()) {
279 nonempty_.wait(lock);
281 nonempty_.wait_for(lock,
delay);
289 std::function<
void()> task,
Priority priority)
290 : scheduler_(scheduler),
299 if (active_)
return false;
300 if (id_ >= 0) scheduler_.
cancel(id_);
302 id_ = scheduler_.
scheduleAfter(initial_delay, *
this, priority_);
307 if (!active_)
return false;
313 if (
id != id_ || !active_)
return;
315 if (!active_)
return;
320 if (id_ >= 0) scheduler_.
cancel(id_);
324 std::function<
void()> task,
Priority priority)
325 : scheduler_(scheduler),
333 if (active_)
return false;
334 if (id_ >= 0) scheduler_.
cancel(id_);
337 id_ = scheduler_.
scheduleOn(next_, *
this, priority_);
342 if (!active_)
return false;
348 if (
id != id_ || !active_)
return;
351 if (!active_)
return;
352 id_ = scheduler_.
scheduleOn(next_, *
this, priority_);
356 if (id_ >= 0) scheduler_.
cancel(id_);
360 : scheduler_(scheduler), task_(task), id_(-1), scheduled_(false) {}
363 if (scheduled_) scheduler_.
cancel(id_);
364 id_ = scheduler_.
scheduleOn(when, *
this, priority);
369 if (scheduled_) scheduler_.
cancel(id_);
375 if (scheduled_) scheduler_.
cancel(id_);
381 if (!scheduled_ ||
id != id_)
return;
388 if (id_ >= 0) scheduler_.
cancel(id_);
392 std::function<
void()> done_cb)
393 : scheduler_(scheduler), itr_(iterator), id_(-1), done_cb_(done_cb) {}
402 int64_t next_delay_us = itr_.
next();
403 if (next_delay_us >= 0) {
404 id_ = scheduler_.
scheduleAfter(roo_time::Micros(next_delay_us), *
this);
416 if (id_ >= 0) scheduler_.
cancel(id_);
Abstract interface for executable tasks in the scheduler queue.
void execute(ExecutionID id) override
bool start(roo_time::Uptime when=roo_time::Uptime::Now())
IteratingTask(Scheduler &scheduler, Iterator &iterator, std::function< void()> done_cb=std::function< void()>())
void execute(ExecutionID id) override
bool start(roo_time::Uptime when=roo_time::Uptime::Now())
PeriodicTask(Scheduler &scheduler, roo_time::Duration period, std::function< void()> task, Priority priority=Priority::kNormal)
bool start()
Starts task using configured periodic delay.
void execute(ExecutionID id) override
RepetitiveTask(Scheduler &scheduler, roo_time::Duration delay, std::function< void()> task, Priority priority=Priority::kNormal)
Schedules and dispatches delayed task executions.
bool executeEligibleTasksUpTo(roo_time::Uptime deadline, Priority min_priority=Priority::kMinimum, int max_count=-1)
Executes up to max_count eligible tasks due no later than deadline.
Scheduler()
Creates an empty scheduler.
ExecutionID scheduleAfter(roo_time::Duration delay, Executable &task, Priority priority=Priority::kNormal)
Schedules execution after delay elapses.
void cancel(ExecutionID)
Marks execution identified by id as canceled.
void run()
Runs scheduler event loop forever.
ExecutionID scheduleNow(Executable &task, Priority priority=Priority::kNormal)
Schedules execution as soon as possible.
void delay(roo_time::Duration delay, Priority min_priority=Priority::kNormal)
Delays for at least delay while executing scheduled work.
void delayUntil(roo_time::Uptime deadline, Priority min_priority=Priority::kNormal)
Delays until deadline while executing scheduled work.
roo_time::Uptime getNearestExecutionTime() const
Returns due time of the nearest upcoming execution.
roo_time::Duration getNearestExecutionDelay() const
Returns delay to the nearest upcoming execution.
void pruneCanceled()
Removes canceled executions from the queue.
ExecutionID scheduleOn(roo_time::Uptime when, Executable &task, Priority priority=Priority::kNormal)
Schedules execution no earlier than when.
bool executeEligibleTasks(Priority min_priority, int max_count=-1)
Executes up to max_count eligible tasks with at least min_priority.
void scheduleAfter(roo_time::Duration delay, Priority priority=Priority::kNormal)
Schedules or reschedules task after delay.
void scheduleNow(Priority priority=Priority::kNormal)
Schedules or reschedules task for immediate execution.
void execute(ExecutionID id) override
SingletonTask(Scheduler &scheduler, std::function< void()> task)
void scheduleOn(roo_time::Uptime when, Priority priority=Priority::kNormal)
Schedules or reschedules task at absolute time when.
Convenience adapter for one-time execution of an arbitrary callable.
Priority
Priority controls dispatch order among eligible tasks.
int32_t ExecutionID
Represents a unique task execution identifier.