crossbeam-channel/src/flavors/tick.rs
Line | Count | Source (jump to first uncovered line) |
1 | | //! Channel that delivers messages periodically. |
2 | | //! |
3 | | //! Messages cannot be sent into this kind of channel; they are materialized on demand. |
4 | | |
5 | | use std::thread; |
6 | | use std::time::{Duration, Instant}; |
7 | | |
8 | | use crossbeam_utils::atomic::AtomicCell; |
9 | | |
10 | | use crate::context::Context; |
11 | | use crate::err::{RecvTimeoutError, TryRecvError}; |
12 | | use crate::select::{Operation, SelectHandle, Token}; |
13 | | |
14 | | /// Result of a receive operation. |
15 | | pub(crate) type TickToken = Option<Instant>; |
16 | | |
17 | | /// Channel that delivers messages periodically. |
18 | | pub(crate) struct Channel { |
19 | | /// The instant at which the next message will be delivered. |
20 | | delivery_time: AtomicCell<Instant>, |
21 | | |
22 | | /// The time interval in which messages get delivered. |
23 | | duration: Duration, |
24 | | } |
25 | | |
26 | | impl Channel { |
27 | | /// Creates a channel that delivers messages periodically. |
28 | | #[inline] |
29 | 30.2k | pub(crate) fn new(dur: Duration) -> Self { |
30 | 30.2k | Channel { |
31 | 30.2k | delivery_time: AtomicCell::new(Instant::now() + dur), |
32 | 30.2k | duration: dur, |
33 | 30.2k | } |
34 | 30.2k | } |
35 | | |
36 | | /// Attempts to receive a message without blocking. |
37 | | #[inline] |
38 | 17.1k | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { |
39 | 17.1k | loop { |
40 | 17.1k | let now = Instant::now(); |
41 | 17.1k | let delivery_time = self.delivery_time.load(); |
42 | 17.1k | |
43 | 17.1k | if now < delivery_time { |
44 | 5.79k | return Err(TryRecvError::Empty); |
45 | 11.3k | } |
46 | 11.3k | |
47 | 11.3k | if self |
48 | 11.3k | .delivery_time |
49 | 11.3k | .compare_exchange(delivery_time, now + self.duration) |
50 | 11.3k | .is_ok() |
51 | | { |
52 | 11.3k | return Ok(delivery_time); |
53 | 9 | } |
54 | | } |
55 | 17.1k | } Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv <crossbeam_channel::flavors::tick::Channel>::try_recv Line | Count | Source | 38 | 2.58k | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { | 39 | 2.58k | loop { | 40 | 2.58k | let now = Instant::now(); | 41 | 2.58k | let delivery_time = self.delivery_time.load(); | 42 | 2.58k | | 43 | 2.58k | if now < delivery_time { | 44 | 0 | return Err(TryRecvError::Empty); | 45 | 2.58k | } | 46 | 2.58k | | 47 | 2.58k | if self | 48 | 2.58k | .delivery_time | 49 | 2.58k | .compare_exchange(delivery_time, now + self.duration) | 50 | 2.58k | .is_ok() | 51 | | { | 52 | 2.58k | return Ok(delivery_time); | 53 | 0 | } | 54 | | } | 55 | 2.58k | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv <crossbeam_channel::flavors::tick::Channel>::try_recv Line | Count | Source | 38 | 9.39k | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { | 39 | 9.40k | loop { | 40 | 9.40k | let now = Instant::now(); | 41 | 9.40k | let delivery_time = self.delivery_time.load(); | 42 | 9.40k | | 43 | 9.40k | if now < delivery_time { | 44 | 5.79k | return Err(TryRecvError::Empty); | 45 | 3.60k | } | 46 | 3.60k | | 47 | 3.60k | if self | 48 | 3.60k | .delivery_time | 49 | 3.60k | .compare_exchange(delivery_time, now + self.duration) | 50 | 3.60k | .is_ok() | 51 | | { | 52 | 3.62k | return Ok(delivery_time); | 53 | 9 | } | 54 | | } | 55 | 9.41k | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::try_recv <crossbeam_channel::flavors::tick::Channel>::try_recv Line | Count | Source | 38 | 2.58k | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { | 39 | 2.58k | loop { | 40 | 2.58k | let now = Instant::now(); | 41 | 2.58k | let delivery_time = self.delivery_time.load(); | 42 | 2.58k | | 43 | 2.58k | if now < delivery_time { | 44 | 0 | return Err(TryRecvError::Empty); | 45 | 2.58k | } | 46 | 2.58k | | 47 | 2.58k | if self | 48 | 2.58k | .delivery_time | 49 | 2.58k | .compare_exchange(delivery_time, now + self.duration) | 50 | 2.58k | .is_ok() | 51 | | { | 52 | 2.58k | return Ok(delivery_time); | 53 | 0 | } | 54 | | } | 55 | 2.58k | } |
<crossbeam_channel::flavors::tick::Channel>::try_recv Line | Count | Source | 38 | 2.58k | pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> { | 39 | 2.58k | loop { | 40 | 2.58k | let now = Instant::now(); | 41 | 2.58k | let delivery_time = self.delivery_time.load(); | 42 | 2.58k | | 43 | 2.58k | if now < delivery_time { | 44 | 0 | return Err(TryRecvError::Empty); | 45 | 2.58k | } | 46 | 2.58k | | 47 | 2.58k | if self | 48 | 2.58k | .delivery_time | 49 | 2.58k | .compare_exchange(delivery_time, now + self.duration) | 50 | 2.58k | .is_ok() | 51 | | { | 52 | 2.58k | return Ok(delivery_time); | 53 | 0 | } | 54 | | } | 55 | 2.58k | } |
|
56 | | |
57 | | /// Receives a message from the channel. |
58 | | #[inline] |
59 | 31 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { |
60 | 31 | loop { |
61 | 31 | let delivery_time = self.delivery_time.load(); |
62 | 31 | let now = Instant::now(); |
63 | | |
64 | 31 | if let Some(d14 ) = deadline { |
65 | 14 | if d < delivery_time { |
66 | 2 | if now < d { |
67 | 2 | thread::sleep(d - now); |
68 | 2 | }0 |
69 | 2 | return Err(RecvTimeoutError::Timeout); |
70 | 12 | } |
71 | 17 | } |
72 | | |
73 | 29 | if self |
74 | 29 | .delivery_time |
75 | 29 | .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) |
76 | 29 | .is_ok() |
77 | | { |
78 | 29 | if now < delivery_time { |
79 | 21 | thread::sleep(delivery_time - now); |
80 | 21 | }8 |
81 | 29 | return Ok(delivery_time); |
82 | 0 | } |
83 | | } |
84 | 31 | } Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv <crossbeam_channel::flavors::tick::Channel>::recv Line | Count | Source | 59 | 10 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { | 60 | 10 | loop { | 61 | 10 | let delivery_time = self.delivery_time.load(); | 62 | 10 | let now = Instant::now(); | 63 | | | 64 | 10 | if let Some(d) = deadline { | 65 | 10 | if d < delivery_time { | 66 | 0 | if now < d { | 67 | 0 | thread::sleep(d - now); | 68 | 0 | } | 69 | 0 | return Err(RecvTimeoutError::Timeout); | 70 | 10 | } | 71 | 0 | } | 72 | | | 73 | 10 | if self | 74 | 10 | .delivery_time | 75 | 10 | .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) | 76 | 10 | .is_ok() | 77 | | { | 78 | 10 | if now < delivery_time { | 79 | 10 | thread::sleep(delivery_time - now); | 80 | 10 | }0 | 81 | 10 | return Ok(delivery_time); | 82 | 0 | } | 83 | | } | 84 | 10 | } |
<crossbeam_channel::flavors::tick::Channel>::recv Line | Count | Source | 59 | 7 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { | 60 | 7 | loop { | 61 | 7 | let delivery_time = self.delivery_time.load(); | 62 | 7 | let now = Instant::now(); | 63 | | | 64 | 7 | if let Some(d4 ) = deadline { | 65 | 4 | if d < delivery_time { | 66 | 2 | if now < d { | 67 | 2 | thread::sleep(d - now); | 68 | 2 | }0 | 69 | 2 | return Err(RecvTimeoutError::Timeout); | 70 | 2 | } | 71 | 3 | } | 72 | | | 73 | 5 | if self | 74 | 5 | .delivery_time | 75 | 5 | .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) | 76 | 5 | .is_ok() | 77 | | { | 78 | 5 | if now < delivery_time { | 79 | 5 | thread::sleep(delivery_time - now); | 80 | 5 | }0 | 81 | 5 | return Ok(delivery_time); | 82 | 0 | } | 83 | | } | 84 | 7 | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv <crossbeam_channel::flavors::tick::Channel>::recv Line | Count | Source | 59 | 14 | pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> { | 60 | 14 | loop { | 61 | 14 | let delivery_time = self.delivery_time.load(); | 62 | 14 | let now = Instant::now(); | 63 | | | 64 | 14 | if let Some(d0 ) = deadline { | 65 | 0 | if d < delivery_time { | 66 | 0 | if now < d { | 67 | 0 | thread::sleep(d - now); | 68 | 0 | } | 69 | 0 | return Err(RecvTimeoutError::Timeout); | 70 | 0 | } | 71 | 14 | } | 72 | | | 73 | 14 | if self | 74 | 14 | .delivery_time | 75 | 14 | .compare_exchange(delivery_time, delivery_time.max(now) + self.duration) | 76 | 14 | .is_ok() | 77 | | { | 78 | 14 | if now < delivery_time { | 79 | 6 | thread::sleep(delivery_time - now); | 80 | 8 | } | 81 | 14 | return Ok(delivery_time); | 82 | 0 | } | 83 | | } | 84 | 14 | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::recv |
85 | | |
86 | | /// Reads a message from the channel. |
87 | | #[inline] |
88 | 8.74k | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { |
89 | 8.74k | token.tick.ok_or(()) |
90 | 8.74k | } <crossbeam_channel::flavors::tick::Channel>::read Line | Count | Source | 88 | 2.58k | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { | 89 | 2.58k | token.tick.ok_or(()) | 90 | 2.58k | } |
<crossbeam_channel::flavors::tick::Channel>::read Line | Count | Source | 88 | 3.57k | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { | 89 | 3.57k | token.tick.ok_or(()) | 90 | 3.57k | } |
<crossbeam_channel::flavors::tick::Channel>::read Line | Count | Source | 88 | 2.58k | pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> { | 89 | 2.58k | token.tick.ok_or(()) | 90 | 2.58k | } |
|
91 | | |
92 | | /// Returns `true` if the channel is empty. |
93 | | #[inline] |
94 | 8.99k | pub(crate) fn is_empty(&self) -> bool { |
95 | 8.99k | Instant::now() < self.delivery_time.load() |
96 | 8.99k | } <crossbeam_channel::flavors::tick::Channel>::is_empty Line | Count | Source | 94 | 6.41k | pub(crate) fn is_empty(&self) -> bool { | 95 | 6.41k | Instant::now() < self.delivery_time.load() | 96 | 6.41k | } |
<crossbeam_channel::flavors::tick::Channel>::is_empty Line | Count | Source | 94 | 2.58k | pub(crate) fn is_empty(&self) -> bool { | 95 | 2.58k | Instant::now() < self.delivery_time.load() | 96 | 2.58k | } |
|
97 | | |
98 | | /// Returns `true` if the channel is full. |
99 | | #[inline] |
100 | 3 | pub(crate) fn is_full(&self) -> bool { |
101 | 3 | !self.is_empty() |
102 | 3 | } |
103 | | |
104 | | /// Returns the number of messages in the channel. |
105 | | #[inline] |
106 | 3 | pub(crate) fn len(&self) -> usize { |
107 | 3 | if self.is_empty() { |
108 | 2 | 0 |
109 | | } else { |
110 | 1 | 1 |
111 | | } |
112 | 3 | } Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len <crossbeam_channel::flavors::tick::Channel>::len Line | Count | Source | 106 | 3 | pub(crate) fn len(&self) -> usize { | 107 | 3 | if self.is_empty() { | 108 | 2 | 0 | 109 | | } else { | 110 | 1 | 1 | 111 | | } | 112 | 3 | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel>::len |
113 | | |
114 | | /// Returns the capacity of the channel. |
115 | | #[inline] |
116 | 10 | pub(crate) fn capacity(&self) -> Option<usize> { |
117 | 10 | Some(1) |
118 | 10 | } |
119 | | } |
120 | | |
121 | | impl SelectHandle for Channel { |
122 | | #[inline] |
123 | 14.4k | fn try_select(&self, token: &mut Token) -> bool { |
124 | 14.4k | match self.try_recv() { |
125 | 8.70k | Ok(msg) => { |
126 | 8.70k | token.tick = Some(msg); |
127 | 8.70k | true |
128 | | } |
129 | 5.76k | Err(TryRecvError::Disconnected) => { |
130 | 0 | token.tick = None; |
131 | 0 | true |
132 | | } |
133 | 5.76k | Err(TryRecvError::Empty) => false, |
134 | | } |
135 | 14.4k | } Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Line | Count | Source | 123 | 2.58k | fn try_select(&self, token: &mut Token) -> bool { | 124 | 2.58k | match self.try_recv() { | 125 | 2.58k | Ok(msg) => { | 126 | 2.58k | token.tick = Some(msg); | 127 | 2.58k | true | 128 | | } | 129 | 0 | Err(TryRecvError::Disconnected) => { | 130 | 0 | token.tick = None; | 131 | 0 | true | 132 | | } | 133 | 0 | Err(TryRecvError::Empty) => false, | 134 | | } | 135 | 2.58k | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Line | Count | Source | 123 | 9.30k | fn try_select(&self, token: &mut Token) -> bool { | 124 | 9.30k | match self.try_recv() { | 125 | 3.53k | Ok(msg) => { | 126 | 3.53k | token.tick = Some(msg); | 127 | 3.53k | true | 128 | | } | 129 | 5.76k | Err(TryRecvError::Disconnected) => { | 130 | 0 | token.tick = None; | 131 | 0 | true | 132 | | } | 133 | 5.76k | Err(TryRecvError::Empty) => false, | 134 | | } | 135 | 9.30k | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select Line | Count | Source | 123 | 2.58k | fn try_select(&self, token: &mut Token) -> bool { | 124 | 2.58k | match self.try_recv() { | 125 | 2.58k | Ok(msg) => { | 126 | 2.58k | token.tick = Some(msg); | 127 | 2.58k | true | 128 | | } | 129 | 0 | Err(TryRecvError::Disconnected) => { | 130 | 0 | token.tick = None; | 131 | 0 | true | 132 | | } | 133 | 0 | Err(TryRecvError::Empty) => false, | 134 | | } | 135 | 2.58k | } |
Unexecuted instantiation: <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::try_select |
136 | | |
137 | | #[inline] |
138 | 5.58k | fn deadline(&self) -> Option<Instant> { |
139 | 5.58k | Some(self.delivery_time.load()) |
140 | 5.58k | } |
141 | | |
142 | | #[inline] |
143 | 5.49k | fn register(&self, _oper: Operation, _cx: &Context) -> bool { |
144 | 5.49k | self.is_ready() |
145 | 5.49k | } |
146 | | |
147 | | #[inline] |
148 | 5.48k | fn unregister(&self, _oper: Operation) {} |
149 | | |
150 | | #[inline] |
151 | | fn accept(&self, token: &mut Token, _cx: &Context) -> bool { |
152 | | self.try_select(token) |
153 | | } |
154 | | |
155 | | #[inline] |
156 | 8.99k | fn is_ready(&self) -> bool { |
157 | 8.99k | !self.is_empty() |
158 | 8.99k | } <crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::is_ready Line | Count | Source | 156 | 6.40k | fn is_ready(&self) -> bool { | 157 | 6.40k | !self.is_empty() | 158 | 6.40k | } |
<crossbeam_channel::flavors::tick::Channel as crossbeam_channel::select::SelectHandle>::is_ready Line | Count | Source | 156 | 2.58k | fn is_ready(&self) -> bool { | 157 | 2.58k | !self.is_empty() | 158 | 2.58k | } |
|
159 | | |
160 | | #[inline] |
161 | 71 | fn watch(&self, _oper: Operation, _cx: &Context) -> bool { |
162 | 71 | self.is_ready() |
163 | 71 | } |
164 | | |
165 | | #[inline] |
166 | 67 | fn unwatch(&self, _oper: Operation) {} |
167 | | } |