Coverage Report

Created: 2021-01-22 16:54

crossbeam-utils/src/sync/parker.rs
Line
Count
Source (jump to first uncovered line)
1
use crate::primitive::sync::atomic::AtomicUsize;
2
use crate::primitive::sync::{Arc, Condvar, Mutex};
3
use core::sync::atomic::Ordering::SeqCst;
4
use std::fmt;
5
use std::marker::PhantomData;
6
use std::time::{Duration, Instant};
7
8
/// A thread parking primitive.
9
///
10
/// Conceptually, each `Parker` has an associated token which is initially not present:
11
///
12
/// * The [`park`] method blocks the current thread unless or until the token is available, at
13
///   which point it automatically consumes the token.
14
///
15
/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16
///   a specified maximum time.
17
///
18
/// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
19
///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
20
///   returning immediately.
21
///
22
/// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
23
/// [`park`] and [`unpark`].
24
///
25
/// # Examples
26
///
27
/// ```
28
/// use std::thread;
29
/// use std::time::Duration;
30
/// use crossbeam_utils::sync::Parker;
31
///
32
/// let p = Parker::new();
33
/// let u = p.unparker().clone();
34
///
35
/// // Make the token available.
36
/// u.unpark();
37
/// // Wakes up immediately and consumes the token.
38
/// p.park();
39
///
40
/// thread::spawn(move || {
41
///     thread::sleep(Duration::from_millis(500));
42
///     u.unpark();
43
/// });
44
///
45
/// // Wakes up when `u.unpark()` provides the token.
46
/// p.park();
47
/// ```
48
///
49
/// [`park`]: Parker::park
50
/// [`park_timeout`]: Parker::park_timeout
51
/// [`park_deadline`]: Parker::park_deadline
52
/// [`unpark`]: Unparker::unpark
53
pub struct Parker {
54
    unparker: Unparker,
55
    _marker: PhantomData<*const ()>,
56
}
57
58
unsafe impl Send for Parker {}
59
60
impl Default for Parker {
61
22
    fn default() -> Self {
62
22
        Self {
63
22
            unparker: Unparker {
64
22
                inner: Arc::new(Inner {
65
22
                    state: AtomicUsize::new(EMPTY),
66
22
                    lock: Mutex::new(()),
67
22
                    cvar: Condvar::new(),
68
22
                }),
69
22
            },
70
22
            _marker: PhantomData,
71
22
        }
72
22
    }
73
}
74
75
impl Parker {
76
    /// Creates a new `Parker`.
77
    ///
78
    /// # Examples
79
    ///
80
    /// ```
81
    /// use crossbeam_utils::sync::Parker;
82
    ///
83
    /// let p = Parker::new();
84
    /// ```
85
    ///
86
22
    pub fn new() -> Parker {
87
22
        Self::default()
88
22
    }
89
90
    /// Blocks the current thread until the token is made available.
91
    ///
92
    /// # Examples
93
    ///
94
    /// ```
95
    /// use crossbeam_utils::sync::Parker;
96
    ///
97
    /// let p = Parker::new();
98
    /// let u = p.unparker().clone();
99
    ///
100
    /// // Make the token available.
101
    /// u.unpark();
102
    ///
103
    /// // Wakes up immediately and consumes the token.
104
    /// p.park();
105
    /// ```
106
5
    pub fn park(&self) {
107
5
        self.unparker.inner.park(None);
108
5
    }
109
110
    /// Blocks the current thread until the token is made available, but only for a limited time.
111
    ///
112
    /// # Examples
113
    ///
114
    /// ```
115
    /// use std::time::Duration;
116
    /// use crossbeam_utils::sync::Parker;
117
    ///
118
    /// let p = Parker::new();
119
    ///
120
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
121
    /// p.park_timeout(Duration::from_millis(500));
122
    /// ```
123
31
    pub fn park_timeout(&self, timeout: Duration) {
124
31
        self.park_deadline(Instant::now() + timeout)
125
31
    }
126
127
    /// Blocks the current thread until the token is made available, or until a certain deadline.
128
    ///
129
    /// # Examples
130
    ///
131
    /// ```
132
    /// use std::time::{Duration, Instant};
133
    /// use crossbeam_utils::sync::Parker;
134
    ///
135
    /// let p = Parker::new();
136
    /// let deadline = Instant::now() + Duration::from_millis(500);
137
    ///
138
    /// // Waits for the token to become available, but will not wait longer than 500 ms.
139
    /// p.park_deadline(deadline);
140
    /// ```
141
32
    pub fn park_deadline(&self, deadline: Instant) {
142
32
        self.unparker.inner.park(Some(deadline))
143
32
    }
144
145
    /// Returns a reference to an associated [`Unparker`].
146
    ///
147
    /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
148
    ///
149
    /// # Examples
150
    ///
151
    /// ```
152
    /// use crossbeam_utils::sync::Parker;
153
    ///
154
    /// let p = Parker::new();
155
    /// let u = p.unparker().clone();
156
    ///
157
    /// // Make the token available.
158
    /// u.unpark();
159
    /// // Wakes up immediately and consumes the token.
160
    /// p.park();
161
    /// ```
162
    ///
163
    /// [`park`]: Parker::park
164
    /// [`park_timeout`]: Parker::park_timeout
165
25
    pub fn unparker(&self) -> &Unparker {
166
25
        &self.unparker
167
25
    }
168
169
    /// Converts a `Parker` into a raw pointer.
170
    ///
171
    /// # Examples
172
    ///
173
    /// ```
174
    /// use crossbeam_utils::sync::Parker;
175
    ///
176
    /// let p = Parker::new();
177
    /// let raw = Parker::into_raw(p);
178
    /// ```
179
2
    pub fn into_raw(this: Parker) -> *const () {
180
2
        Unparker::into_raw(this.unparker)
181
2
    }
182
183
    /// Converts a raw pointer into a `Parker`.
184
    ///
185
    /// # Safety
186
    ///
187
    /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
188
    ///
189
    /// # Examples
190
    ///
191
    /// ```
192
    /// use crossbeam_utils::sync::Parker;
193
    ///
194
    /// let p = Parker::new();
195
    /// let raw = Parker::into_raw(p);
196
    /// let p = unsafe { Parker::from_raw(raw) };
197
    /// ```
198
1
    pub unsafe fn from_raw(ptr: *const ()) -> Parker {
199
1
        Parker {
200
1
            unparker: Unparker::from_raw(ptr),
201
1
            _marker: PhantomData,
202
1
        }
203
1
    }
204
}
205
206
impl fmt::Debug for Parker {
207
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208
        f.pad("Parker { .. }")
209
    }
210
}
211
212
/// Unparks a thread parked by the associated [`Parker`].
213
pub struct Unparker {
214
    inner: Arc<Inner>,
215
}
216
217
unsafe impl Send for Unparker {}
218
unsafe impl Sync for Unparker {}
219
220
impl Unparker {
221
    /// Atomically makes the token available if it is not already.
222
    ///
223
    /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
224
    /// any.
225
    ///
226
    /// # Examples
227
    ///
228
    /// ```
229
    /// use std::thread;
230
    /// use std::time::Duration;
231
    /// use crossbeam_utils::sync::Parker;
232
    ///
233
    /// let p = Parker::new();
234
    /// let u = p.unparker().clone();
235
    ///
236
    /// thread::spawn(move || {
237
    ///     thread::sleep(Duration::from_millis(500));
238
    ///     u.unpark();
239
    /// });
240
    ///
241
    /// // Wakes up when `u.unpark()` provides the token.
242
    /// p.park();
243
    /// ```
244
    ///
245
    /// [`park`]: Parker::park
246
    /// [`park_timeout`]: Parker::park_timeout
247
25
    pub fn unpark(&self) {
248
25
        self.inner.unpark()
249
25
    }
250
251
    /// Converts an `Unparker` into a raw pointer.
252
    ///
253
    /// # Examples
254
    ///
255
    /// ```
256
    /// use crossbeam_utils::sync::{Parker, Unparker};
257
    ///
258
    /// let p = Parker::new();
259
    /// let u = p.unparker().clone();
260
    /// let raw = Unparker::into_raw(u);
261
    /// ```
262
3
    pub fn into_raw(this: Unparker) -> *const () {
263
3
        Arc::into_raw(this.inner) as *const ()
264
3
    }
265
266
    /// Converts a raw pointer into an `Unparker`.
267
    ///
268
    /// # Safety
269
    ///
270
    /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
271
    ///
272
    /// # Examples
273
    ///
274
    /// ```
275
    /// use crossbeam_utils::sync::{Parker, Unparker};
276
    ///
277
    /// let p = Parker::new();
278
    /// let u = p.unparker().clone();
279
    ///
280
    /// let raw = Unparker::into_raw(u);
281
    /// let u = unsafe { Unparker::from_raw(raw) };
282
    /// ```
283
1
    pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
284
1
        Unparker {
285
1
            inner: Arc::from_raw(ptr as *const Inner),
286
1
        }
287
1
    }
288
}
289
290
impl fmt::Debug for Unparker {
291
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292
        f.pad("Unparker { .. }")
293
    }
294
}
295
296
impl Clone for Unparker {
297
15
    fn clone(&self) -> Unparker {
298
15
        Unparker {
299
15
            inner: self.inner.clone(),
300
15
        }
301
15
    }
302
}
303
304
const EMPTY: usize = 0;
305
const PARKED: usize = 1;
306
const NOTIFIED: usize = 2;
307
308
struct Inner {
309
    state: AtomicUsize,
310
    lock: Mutex<()>,
311
    cvar: Condvar,
312
}
313
314
impl Inner {
315
37
    fn park(&self, deadline: Option<Instant>) {
316
        // If we were previously notified then we consume this notification and return quickly.
317
37
        if self
318
37
            .state
319
37
            .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
320
37
            .is_ok()
321
        {
322
13
            return;
323
24
        }
324
325
        // If the timeout is zero, then there is no need to actually block.
326
24
        if let Some(
deadline22
) = deadline {
327
22
            if deadline <= Instant::now() {
328
0
                return;
329
22
            }
330
2
        }
331
332
        // Otherwise we need to coordinate going to sleep.
333
24
        let mut m = self.lock.lock().unwrap();
334
24
335
24
        match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
336
24
            Ok(_) => {}
337
            // Consume this notification to avoid spurious wakeups in the next park.
338
            Err(NOTIFIED) => {
339
                // We must read `state` here, even though we know it will be `NOTIFIED`. This is
340
                // because `unpark` may have been called again since we read `NOTIFIED` in the
341
                // `compare_exchange` above. We must perform an acquire operation that synchronizes
342
                // with that `unpark` to observe any writes it made before the call to `unpark`. To
343
                // do that we must read from the write it made to `state`.
344
0
                let old = self.state.swap(EMPTY, SeqCst);
345
0
                assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
346
0
                return;
347
            }
348
0
            Err(n) => panic!("inconsistent park_timeout state: {}", n),
349
        }
350
351
36
        loop {
352
36
            // Block the current thread on the conditional variable.
353
36
            m = match deadline {
354
36
                None => 
self.cvar.wait(m).unwrap()2
,
355
34
                Some(deadline) => {
356
34
                    let now = Instant::now();
357
34
                    if now < deadline {
358
                        // We could check for a timeout here, in the return value of wait_timeout,
359
                        // but in the case that a timeout and an unpark arrive simultaneously, we
360
                        // prefer to report the former.
361
22
                        self.cvar.wait_timeout(m, deadline - now).unwrap().0
362
                    } else {
363
                        // We've timed out; swap out the state back to empty on our way out
364
12
                        match self.state.swap(EMPTY, SeqCst) {
365
12
                            NOTIFIED | PARKED => return,
366
0
                            n => panic!("inconsistent park_timeout state: {}", n),
367
                        };
368
                    }
369
                }
370
            };
371
372
24
            if self
373
24
                .state
374
24
                .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
375
24
                .is_ok()
376
            {
377
                // got a notification
378
12
                return;
379
12
            }
380
381
            // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
382
            // in the branch above, when we discover the deadline is in the past
383
        }
384
37
    }
385
386
25
    pub(crate) fn unpark(&self) {
387
        // To ensure the unparked thread will observe any writes we made before this call, we must
388
        // perform a release operation that `park` can synchronize with. To do that we must write
389
        // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
390
        // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
391
25
        match self.state.swap(NOTIFIED, SeqCst) {
392
13
            EMPTY => return,    // no one was waiting
393
0
            NOTIFIED => return, // already unparked
394
12
            PARKED => {}        // gotta go wake someone up
395
0
            _ => panic!("inconsistent state in unpark"),
396
        }
397
398
        // There is a period between when the parked thread sets `state` to `PARKED` (or last
399
        // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
400
        // If we were to notify during this period it would be ignored and then when the parked
401
        // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
402
        // stage so we can acquire `lock` to wait until it is ready to receive the notification.
403
        //
404
        // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
405
        // it doesn't get woken only to have to wait for us to release `lock`.
406
12
        drop(self.lock.lock().unwrap());
407
12
        self.cvar.notify_one();
408
25
    }
409
}