Fix panic when a handler triggers handler scheduling #21
@@ -57,7 +57,7 @@ impl HandlerQueues {
|
||||
let id = HandlerId::from(&handler);
|
||||
let insert_at = match self
|
||||
.timer_handlers
|
||||
.binary_search_by(|t| t.when.cmp(&handler.when))
|
||||
.binary_search_by(|t| handler.when.cmp(&t.when))
|
||||
{
|
||||
Ok(i) => i,
|
||||
Err(i) => i,
|
||||
@@ -101,23 +101,7 @@ impl HandlerQueues {
|
||||
/// If a timer handler that is run should repeat, it will be scheduled
|
||||
/// again.
|
||||
pub fn check_timer_handlers(&mut self, now: Instant) {
|
||||
let mut reschedule = Vec::new();
|
||||
loop {
|
||||
let Some(mut t) = self.timer_handlers.pop() else {
|
||||
break;
|
||||
};
|
||||
if t.when < now {
|
||||
(t.callback)();
|
||||
if !t.delay.is_zero() {
|
||||
t.when = now + t.delay;
|
||||
reschedule.push(t);
|
||||
}
|
||||
} else {
|
||||
reschedule.push(t);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let reschedule = run_timer_handlers(mem::take(&mut self.timer_handlers), now);
|
||||
for t in reschedule.into_iter() {
|
||||
self.enqueue_timer(t);
|
||||
}
|
||||
@@ -176,6 +160,32 @@ impl HandlerQueues {
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs each of `handlers` that should run before `now` and returns those that
|
||||
/// need to be rescheduled, plus any that weren't run, in arbitrary order.
|
||||
fn run_timer_handlers(
|
||||
mut handlers: Vec<Box<TimerHandler>>,
|
||||
now: Instant,
|
||||
) -> Vec<Box<TimerHandler>> {
|
||||
let mut reschedule = Vec::new();
|
||||
loop {
|
||||
let Some(mut t) = handlers.pop() else {
|
||||
break;
|
||||
};
|
||||
if t.when < now {
|
||||
(t.callback)();
|
||||
if !t.delay.is_zero() {
|
||||
t.when = now + t.delay;
|
||||
reschedule.push(t);
|
||||
}
|
||||
} else {
|
||||
reschedule.push(t);
|
||||
break;
|
||||
}
|
||||
}
|
||||
reschedule.extend(handlers);
|
||||
reschedule
|
||||
}
|
||||
|
||||
/// Tracks a callback, a time when it should be called, and an interval for
|
||||
/// optionally repeating the callback.
|
||||
///
|
||||
@@ -224,11 +234,117 @@ fn run_global_idle_handlers() -> QueueStatus {
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::atomic;
|
||||
|
||||
#[test]
|
||||
fn timers_enqueued_in_order() {
|
||||
let mut handlers = HandlerQueues::new();
|
||||
|
||||
assert!(handlers.timer_handlers.is_empty());
|
||||
|
||||
let now = Instant::now();
|
||||
handlers.enqueue_once_timer(Box::new(|| {}), now);
|
||||
handlers.enqueue_once_timer(Box::new(|| {}), now + Duration::from_millis(2));
|
||||
handlers.enqueue_once_timer(Box::new(|| {}), now + Duration::from_millis(1));
|
||||
|
||||
assert_eq!(handlers.timer_handlers.len(), 3);
|
||||
// Timers are popped off the end, so we want things to be in descending order.
|
||||
assert_eq!(handlers.timer_handlers[2].when, now);
|
||||
assert_eq!(
|
||||
handlers.timer_handlers[1].when,
|
||||
now + Duration::from_millis(1)
|
||||
);
|
||||
assert_eq!(
|
||||
handlers.timer_handlers[0].when,
|
||||
now + Duration::from_millis(2)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn run_timer_handlers_remembers_unexpired() {
|
||||
let mut handlers = HandlerQueues::new();
|
||||
static COUNTER_A: atomic::AtomicI32 = atomic::AtomicI32::new(0);
|
||||
static COUNTER_B: atomic::AtomicI32 = atomic::AtomicI32::new(0);
|
||||
|
||||
let now = Instant::now();
|
||||
handlers.enqueue_persistent_timer(
|
||||
Box::new(|| {
|
||||
COUNTER_A.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}),
|
||||
now,
|
||||
Duration::from_millis(10),
|
||||
);
|
||||
handlers.enqueue_persistent_timer(
|
||||
Box::new(|| {
|
||||
COUNTER_B.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}),
|
||||
now + Duration::from_millis(10),
|
||||
Duration::from_millis(10),
|
||||
);
|
||||
|
||||
// No timer handlers have run.
|
||||
assert_eq!(COUNTER_A.load(atomic::Ordering::SeqCst), 0);
|
||||
assert_eq!(COUNTER_B.load(atomic::Ordering::SeqCst), 0);
|
||||
|
||||
handlers.check_timer_handlers(now + Duration::from_millis(1));
|
||||
|
||||
// Only the timer that goes off immediately after `now` should have run.
|
||||
assert_eq!(COUNTER_A.load(atomic::Ordering::SeqCst), 1);
|
||||
assert_eq!(COUNTER_B.load(atomic::Ordering::SeqCst), 0);
|
||||
|
||||
// Both timers are still in queue.
|
||||
assert_eq!(handlers.timer_handlers.len(), 2);
|
||||
|
||||
for mut t in handlers.timer_handlers.into_iter() {
|
||||
(t.callback)();
|
||||
}
|
||||
|
||||
// Both values should have been incremented.
|
||||
assert_eq!(COUNTER_A.load(atomic::Ordering::SeqCst), 2);
|
||||
assert_eq!(COUNTER_B.load(atomic::Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn repeating_timer_debounce() {
|
||||
let mut handlers = HandlerQueues::new();
|
||||
static COUNTER: atomic::AtomicI32 = atomic::AtomicI32::new(0);
|
||||
|
||||
let now = Instant::now();
|
||||
handlers.enqueue_persistent_timer(
|
||||
Box::new(|| {
|
||||
COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
}),
|
||||
now,
|
||||
Duration::from_millis(2),
|
||||
);
|
||||
|
||||
// Timer handler hasn't run.
|
||||
assert_eq!(COUNTER.load(atomic::Ordering::SeqCst), 0);
|
||||
|
||||
// Timer should run every 2 ms, but 10 ms have passed.
|
||||
handlers.check_timer_handlers(now + Duration::from_millis(10));
|
||||
|
||||
// Timer should only have run once.
|
||||
assert_eq!(COUNTER.load(atomic::Ordering::SeqCst), 1);
|
||||
|
||||
// Timer has been rescheduled for an appropriate time.
|
||||
assert_eq!(handlers.timer_handlers.len(), 1);
|
||||
assert_eq!(
|
||||
handlers.timer_handlers[0].when,
|
||||
now + Duration::from_millis(12)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub mod ffi {
|
||||
use super::*;
|
||||
use crate::sendable::Sendable;
|
||||
use std::{
|
||||
ffi::{c_int, c_uint, c_void},
|
||||
mem,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
@@ -298,7 +414,15 @@ pub mod ffi {
|
||||
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn W_CheckTimerHandlers() {
|
||||
with_global_handlers(|handlers| handlers.check_timer_handlers(Instant::now()));
|
||||
let reschedule = run_timer_handlers(
|
||||
with_global_handlers(|handlers| mem::take(&mut handlers.timer_handlers)),
|
||||
Instant::now(),
|
||||
);
|
||||
with_global_handlers(|handlers| {
|
||||
for t in reschedule.into_iter() {
|
||||
handlers.enqueue_timer(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[unsafe(no_mangle)]
|
||||
|
||||
Reference in New Issue
Block a user