Coverage Report

Created: 2021-01-22 16:54

crossbeam-channel/src/flavors/tick.rs
Line
Count
Source (jump to first uncovered line)
1
//! Channel that delivers messages periodically.
2
//!
3
//! Messages cannot be sent into this kind of channel; they are materialized on demand.
4
5
use std::thread;
6
use std::time::{Duration, Instant};
7
8
use crossbeam_utils::atomic::AtomicCell;
9
10
use crate::context::Context;
11
use crate::err::{RecvTimeoutError, TryRecvError};
12
use crate::select::{Operation, SelectHandle, Token};
13
14
/// Result of a receive operation.
15
pub(crate) type TickToken = Option<Instant>;
16
17
/// Channel that delivers messages periodically.
18
pub(crate) struct Channel {
19
    /// The instant at which the next message will be delivered.
20
    delivery_time: AtomicCell<Instant>,
21
22
    /// The time interval in which messages get delivered.
23
    duration: Duration,
24
}
25
26
impl Channel {
27
    /// Creates a channel that delivers messages periodically.
28
    #[inline]
29
30.2k
    pub(crate) fn new(dur: Duration) -> Self {
30
30.2k
        Channel {
31
30.2k
            delivery_time: AtomicCell::new(Instant::now() + dur),
32
30.2k
            duration: dur,
33
30.2k
        }
34
30.2k
    }
35
36
    /// Attempts to receive a message without blocking.
37
    #[inline]
38
17.1k
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39
17.1k
        loop {
40
17.1k
            let now = Instant::now();
41
17.1k
            let delivery_time = self.delivery_time.load();
42
17.1k
43
17.1k
            if now < delivery_time {
44
5.79k
                return Err(TryRecvError::Empty);
45
11.3k
            }
46
11.3k
47
11.3k
            if self
48
11.3k
                .delivery_time
49
11.3k
                .compare_exchange(delivery_time, now + self.duration)
50
11.3k
                .is_ok()
51
            {
52
11.3k
                return Ok(delivery_time);
53
9
            }
54
        }
55
17.1k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
<crossbeam_channel::flavors::tick::Channel>::try_recv
Line
Count
Source
38
2.58k
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39
2.58k
        loop {
40
2.58k
            let now = Instant::now();
41
2.58k
            let delivery_time = self.delivery_time.load();
42
2.58k
43
2.58k
            if now < delivery_time {
44
0
                return Err(TryRecvError::Empty);
45
2.58k
            }
46
2.58k
47
2.58k
            if self
48
2.58k
                .delivery_time
49
2.58k
                .compare_exchange(delivery_time, now + self.duration)
50
2.58k
                .is_ok()
51
            {
52
2.58k
                return Ok(delivery_time);
53
0
            }
54
        }
55
2.58k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
<crossbeam_channel::flavors::tick::Channel>::try_recv
Line
Count
Source
38
9.39k
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39
9.40k
        loop {
40
9.40k
            let now = Instant::now();
41
9.40k
            let delivery_time = self.delivery_time.load();
42
9.40k
43
9.40k
            if now < delivery_time {
44
5.79k
                return Err(TryRecvError::Empty);
45
3.60k
            }
46
3.60k
47
3.60k
            if self
48
3.60k
                .delivery_time
49
3.60k
                .compare_exchange(delivery_time, now + self.duration)
50
3.60k
                .is_ok()
51
            {
52
3.62k
                return Ok(delivery_time);
53
9
            }
54
        }
55
9.41k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv
<crossbeam_channel::flavors::tick::Channel>::try_recv
Line
Count
Source
38
2.58k
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39
2.58k
        loop {
40
2.58k
            let now = Instant::now();
41
2.58k
            let delivery_time = self.delivery_time.load();
42
2.58k
43
2.58k
            if now < delivery_time {
44
0
                return Err(TryRecvError::Empty);
45
2.58k
            }
46
2.58k
47
2.58k
            if self
48
2.58k
                .delivery_time
49
2.58k
                .compare_exchange(delivery_time, now + self.duration)
50
2.58k
                .is_ok()
51
            {
52
2.58k
                return Ok(delivery_time);
53
0
            }
54
        }
55
2.58k
    }
<crossbeam_channel::flavors::tick::Channel>::try_recv
Line
Count
Source
38
2.58k
    pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
39
2.58k
        loop {
40
2.58k
            let now = Instant::now();
41
2.58k
            let delivery_time = self.delivery_time.load();
42
2.58k
43
2.58k
            if now < delivery_time {
44
0
                return Err(TryRecvError::Empty);
45
2.58k
            }
46
2.58k
47
2.58k
            if self
48
2.58k
                .delivery_time
49
2.58k
                .compare_exchange(delivery_time, now + self.duration)
50
2.58k
                .is_ok()
51
            {
52
2.58k
                return Ok(delivery_time);
53
0
            }
54
        }
55
2.58k
    }
56
57
    /// Receives a message from the channel.
58
    #[inline]
59
31
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60
31
        loop {
61
31
            let delivery_time = self.delivery_time.load();
62
31
            let now = Instant::now();
63
64
31
            if let Some(
d14
) = deadline {
65
14
                if d < delivery_time {
66
2
                    if now < d {
67
2
                        thread::sleep(d - now);
68
2
                    }
0
69
2
                    return Err(RecvTimeoutError::Timeout);
70
12
                }
71
17
            }
72
73
29
            if self
74
29
                .delivery_time
75
29
                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76
29
                .is_ok()
77
            {
78
29
                if now < delivery_time {
79
21
                    thread::sleep(delivery_time - now);
80
21
                }
8
81
29
                return Ok(delivery_time);
82
0
            }
83
        }
84
31
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
<crossbeam_channel::flavors::tick::Channel>::recv
Line
Count
Source
59
10
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60
10
        loop {
61
10
            let delivery_time = self.delivery_time.load();
62
10
            let now = Instant::now();
63
64
10
            if let Some(d) = deadline {
65
10
                if d < delivery_time {
66
0
                    if now < d {
67
0
                        thread::sleep(d - now);
68
0
                    }
69
0
                    return Err(RecvTimeoutError::Timeout);
70
10
                }
71
0
            }
72
73
10
            if self
74
10
                .delivery_time
75
10
                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76
10
                .is_ok()
77
            {
78
10
                if now < delivery_time {
79
10
                    thread::sleep(delivery_time - now);
80
10
                }
0
81
10
                return Ok(delivery_time);
82
0
            }
83
        }
84
10
    }
<crossbeam_channel::flavors::tick::Channel>::recv
Line
Count
Source
59
7
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60
7
        loop {
61
7
            let delivery_time = self.delivery_time.load();
62
7
            let now = Instant::now();
63
64
7
            if let Some(
d4
) = deadline {
65
4
                if d < delivery_time {
66
2
                    if now < d {
67
2
                        thread::sleep(d - now);
68
2
                    }
0
69
2
                    return Err(RecvTimeoutError::Timeout);
70
2
                }
71
3
            }
72
73
5
            if self
74
5
                .delivery_time
75
5
                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76
5
                .is_ok()
77
            {
78
5
                if now < delivery_time {
79
5
                    thread::sleep(delivery_time - now);
80
5
                }
0
81
5
                return Ok(delivery_time);
82
0
            }
83
        }
84
7
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
<crossbeam_channel::flavors::tick::Channel>::recv
Line
Count
Source
59
14
    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
60
14
        loop {
61
14
            let delivery_time = self.delivery_time.load();
62
14
            let now = Instant::now();
63
64
14
            if let Some(
d0
) = deadline {
65
0
                if d < delivery_time {
66
0
                    if now < d {
67
0
                        thread::sleep(d - now);
68
0
                    }
69
0
                    return Err(RecvTimeoutError::Timeout);
70
0
                }
71
14
            }
72
73
14
            if self
74
14
                .delivery_time
75
14
                .compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
76
14
                .is_ok()
77
            {
78
14
                if now < delivery_time {
79
6
                    thread::sleep(delivery_time - now);
80
8
                }
81
14
                return Ok(delivery_time);
82
0
            }
83
        }
84
14
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv
85
86
    /// Reads a message from the channel.
87
    #[inline]
88
8.74k
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89
8.74k
        token.tick.ok_or(())
90
8.74k
    }
<crossbeam_channel::flavors::tick::Channel>::read
Line
Count
Source
88
2.58k
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89
2.58k
        token.tick.ok_or(())
90
2.58k
    }
<crossbeam_channel::flavors::tick::Channel>::read
Line
Count
Source
88
3.57k
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89
3.57k
        token.tick.ok_or(())
90
3.57k
    }
<crossbeam_channel::flavors::tick::Channel>::read
Line
Count
Source
88
2.58k
    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
89
2.58k
        token.tick.ok_or(())
90
2.58k
    }
91
92
    /// Returns `true` if the channel is empty.
93
    #[inline]
94
8.99k
    pub(crate) fn is_empty(&self) -> bool {
95
8.99k
        Instant::now() < self.delivery_time.load()
96
8.99k
    }
<crossbeam_channel::flavors::tick::Channel>::is_empty
Line
Count
Source
94
6.41k
    pub(crate) fn is_empty(&self) -> bool {
95
6.41k
        Instant::now() < self.delivery_time.load()
96
6.41k
    }
<crossbeam_channel::flavors::tick::Channel>::is_empty
Line
Count
Source
94
2.58k
    pub(crate) fn is_empty(&self) -> bool {
95
2.58k
        Instant::now() < self.delivery_time.load()
96
2.58k
    }
97
98
    /// Returns `true` if the channel is full.
99
    #[inline]
100
3
    pub(crate) fn is_full(&self) -> bool {
101
3
        !self.is_empty()
102
3
    }
103
104
    /// Returns the number of messages in the channel.
105
    #[inline]
106
3
    pub(crate) fn len(&self) -> usize {
107
3
        if self.is_empty() {
108
2
            0
109
        } else {
110
1
            1
111
        }
112
3
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len
<crossbeam_channel::flavors::tick::Channel>::len
Line
Count
Source
106
3
    pub(crate) fn len(&self) -> usize {
107
3
        if self.is_empty() {
108
2
            0
109
        } else {
110
1
            1
111
        }
112
3
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len
113
114
    /// Returns the capacity of the channel.
115
    #[inline]
116
10
    pub(crate) fn capacity(&self) -> Option<usize> {
117
10
        Some(1)
118
10
    }
119
}
120
121
impl SelectHandle for Channel {
122
    #[inline]
123
14.4k
    fn try_select(&self, token: &mut Token) -> bool {
124
14.4k
        match self.try_recv() {
125
8.70k
            Ok(msg) => {
126
8.70k
                token.tick = Some(msg);
127
8.70k
                true
128
            }
129
5.76k
            Err(TryRecvError::Disconnected) => {
130
0
                token.tick = None;
131
0
                true
132
            }
133
5.76k
            Err(TryRecvError::Empty) => false,
134
        }
135
14.4k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Line
Count
Source
123
2.58k
    fn try_select(&self, token: &mut Token) -> bool {
124
2.58k
        match self.try_recv() {
125
2.58k
            Ok(msg) => {
126
2.58k
                token.tick = Some(msg);
127
2.58k
                true
128
            }
129
0
            Err(TryRecvError::Disconnected) => {
130
0
                token.tick = None;
131
0
                true
132
            }
133
0
            Err(TryRecvError::Empty) => false,
134
        }
135
2.58k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Line
Count
Source
123
9.30k
    fn try_select(&self, token: &mut Token) -> bool {
124
9.30k
        match self.try_recv() {
125
3.53k
            Ok(msg) => {
126
3.53k
                token.tick = Some(msg);
127
3.53k
                true
128
            }
129
5.76k
            Err(TryRecvError::Disconnected) => {
130
0
                token.tick = None;
131
0
                true
132
            }
133
5.76k
            Err(TryRecvError::Empty) => false,
134
        }
135
9.30k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
Line
Count
Source
123
2.58k
    fn try_select(&self, token: &mut Token) -> bool {
124
2.58k
        match self.try_recv() {
125
2.58k
            Ok(msg) => {
126
2.58k
                token.tick = Some(msg);
127
2.58k
                true
128
            }
129
0
            Err(TryRecvError::Disconnected) => {
130
0
                token.tick = None;
131
0
                true
132
            }
133
0
            Err(TryRecvError::Empty) => false,
134
        }
135
2.58k
    }
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select
136
137
    #[inline]
138
5.58k
    fn deadline(&self) -> Option<Instant> {
139
5.58k
        Some(self.delivery_time.load())
140
5.58k
    }
141
142
    #[inline]
143
5.49k
    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
144
5.49k
        self.is_ready()
145
5.49k
    }
146
147
    #[inline]
148
5.48k
    fn unregister(&self, _oper: Operation) {}
149
150
    #[inline]
151
    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
152
        self.try_select(token)
153
    }
154
155
    #[inline]
156
8.99k
    fn is_ready(&self) -> bool {
157
8.99k
        !self.is_empty()
158
8.99k
    }
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::is_ready
Line
Count
Source
156
6.40k
    fn is_ready(&self) -> bool {
157
6.40k
        !self.is_empty()
158
6.40k
    }
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::is_ready
Line
Count
Source
156
2.58k
    fn is_ready(&self) -> bool {
157
2.58k
        !self.is_empty()
158
2.58k
    }
159
160
    #[inline]
161
71
    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
162
71
        self.is_ready()
163
71
    }
164
165
    #[inline]
166
67
    fn unwatch(&self, _oper: Operation) {}
167
}