crossbeam-channel/src/lib.rs
Line | Count | Source |
1 | 1 | //! Multi-producer multi-consumer channels for message passing.//! Multi-producer multi-consumer channels for message passing. |
2 | | //! |
3 | | //! This crate is an alternative to [`std::sync::mpsc`] with more features and better performance. |
4 | | //! |
5 | | //! # Hello, world! |
6 | | //! |
7 | | //! ``` |
8 | | //! use crossbeam_channel::unbounded; |
9 | | //! |
10 | | //! // Create a channel of unbounded capacity. |
11 | | //! let (s, r) = unbounded(); |
12 | | //! |
13 | | //! // Send a message into the channel. |
14 | | //! s.send("Hello, world!").unwrap(); |
15 | | //! |
16 | | //! // Receive the message from the channel. |
17 | | //! assert_eq!(r.recv(), Ok("Hello, world!")); |
18 | | //! ``` |
19 | | //! |
20 | | //! # Channel types |
21 | | //! |
22 | | //! Channels can be created using two functions: |
23 | | //! |
24 | | //! * [`bounded`] creates a channel of bounded capacity, i.e. there is a limit to how many messages |
25 | | //! it can hold at a time. |
26 | | //! |
27 | | //! * [`unbounded`] creates a channel of unbounded capacity, i.e. it can hold any number of |
28 | | //! messages at a time. |
29 | | //! |
30 | | //! Both functions return a [`Sender`] and a [`Receiver`], which represent the two opposite sides |
31 | | //! of a channel. |
32 | | //! |
33 | | //! Creating a bounded channel: |
34 | | //! |
35 | | //! ``` |
36 | | //! use crossbeam_channel::bounded; |
37 | | //! |
38 | | //! // Create a channel that can hold at most 5 messages at a time. |
39 | | //! let (s, r) = bounded(5); |
40 | | //! |
41 | | //! // Can send only 5 messages without blocking. |
42 | | //! for i in 0..5 { |
43 | | //! s.send(i).unwrap(); |
44 | | //! } |
45 | | //! |
46 | | //! // Another call to `send` would block because the channel is full. |
47 | | //! // s.send(5).unwrap(); |
48 | | //! ``` |
49 | | //! |
50 | | //! Creating an unbounded channel: |
51 | | //! |
52 | | //! ``` |
53 | | //! use crossbeam_channel::unbounded; |
54 | | //! |
55 | | //! // Create an unbounded channel. |
56 | | //! let (s, r) = unbounded(); |
57 | | //! |
58 | | //! // Can send any number of messages into the channel without blocking. |
59 | | //! for i in 0..1000 { |
60 | | //! s.send(i).unwrap(); |
61 | | //! } |
62 | | //! ``` |
63 | | //! |
64 | | //! A special case is zero-capacity channel, which cannot hold any messages. Instead, send and |
65 | | //! receive operations must appear at the same time in order to pair up and pass the message over: |
66 | | //! |
67 | | //! ``` |
68 | | //! use std::thread; |
69 | | //! use crossbeam_channel::bounded; |
70 | | //! |
71 | | //! // Create a zero-capacity channel. |
72 | | //! let (s, r) = bounded(0); |
73 | | //! |
74 | | //! // Sending blocks until a receive operation appears on the other side. |
75 | | //! thread::spawn(move || s.send("Hi!").unwrap()); |
76 | | //! |
77 | | //! // Receiving blocks until a send operation appears on the other side. |
78 | | //! assert_eq!(r.recv(), Ok("Hi!")); |
79 | | //! ``` |
80 | | //! |
81 | | //! # Sharing channels |
82 | | //! |
83 | | //! Senders and receivers can be cloned and sent to other threads: |
84 | | //! |
85 | | //! ``` |
86 | | //! use std::thread; |
87 | | //! use crossbeam_channel::bounded; |
88 | | //! |
89 | | //! let (s1, r1) = bounded(0); |
90 | | //! let (s2, r2) = (s1.clone(), r1.clone()); |
91 | | //! |
92 | | //! // Spawn a thread that receives a message and then sends one. |
93 | | //! thread::spawn(move || { |
94 | | //! r2.recv().unwrap(); |
95 | | //! s2.send(2).unwrap(); |
96 | | //! }); |
97 | | //! |
98 | | //! // Send a message and then receive one. |
99 | | //! s1.send(1).unwrap(); |
100 | | //! r1.recv().unwrap(); |
101 | | //! ``` |
102 | | //! |
103 | | //! Note that cloning only creates a new handle to the same sending or receiving side. It does not |
104 | | //! create a separate stream of messages in any way: |
105 | | //! |
106 | | //! ``` |
107 | | //! use crossbeam_channel::unbounded; |
108 | | //! |
109 | | //! let (s1, r1) = unbounded(); |
110 | | //! let (s2, r2) = (s1.clone(), r1.clone()); |
111 | | //! let (s3, r3) = (s2.clone(), r2.clone()); |
112 | | //! |
113 | | //! s1.send(10).unwrap(); |
114 | | //! s2.send(20).unwrap(); |
115 | | //! s3.send(30).unwrap(); |
116 | | //! |
117 | | //! assert_eq!(r3.recv(), Ok(10)); |
118 | | //! assert_eq!(r1.recv(), Ok(20)); |
119 | | //! assert_eq!(r2.recv(), Ok(30)); |
120 | | //! ``` |
121 | | //! |
122 | | //! It's also possible to share senders and receivers by reference: |
123 | | //! |
124 | | //! ``` |
125 | | //! use crossbeam_channel::bounded; |
126 | | //! use crossbeam_utils::thread::scope; |
127 | | //! |
128 | | //! let (s, r) = bounded(0); |
129 | | //! |
130 | | //! scope(|scope| { |
131 | | //! // Spawn a thread that receives a message and then sends one. |
132 | | //! scope.spawn(|_| { |
133 | | //! r.recv().unwrap(); |
134 | | //! s.send(2).unwrap(); |
135 | | //! }); |
136 | | //! |
137 | | //! // Send a message and then receive one. |
138 | | //! s.send(1).unwrap(); |
139 | | //! r.recv().unwrap(); |
140 | | //! }).unwrap(); |
141 | | //! ``` |
142 | | //! |
143 | | //! # Disconnection |
144 | | //! |
145 | | //! When all senders or all receivers associated with a channel get dropped, the channel becomes |
146 | | //! disconnected. No more messages can be sent, but any remaining messages can still be received. |
147 | | //! Send and receive operations on a disconnected channel never block. |
148 | | //! |
149 | | //! ``` |
150 | | //! use crossbeam_channel::{unbounded, RecvError}; |
151 | | //! |
152 | | //! let (s, r) = unbounded(); |
153 | | //! s.send(1).unwrap(); |
154 | | //! s.send(2).unwrap(); |
155 | | //! s.send(3).unwrap(); |
156 | | //! |
157 | | //! // The only sender is dropped, disconnecting the channel. |
158 | | //! drop(s); |
159 | | //! |
160 | | //! // The remaining messages can be received. |
161 | | //! assert_eq!(r.recv(), Ok(1)); |
162 | | //! assert_eq!(r.recv(), Ok(2)); |
163 | | //! assert_eq!(r.recv(), Ok(3)); |
164 | | //! |
165 | | //! // There are no more messages in the channel. |
166 | | //! assert!(r.is_empty()); |
167 | | //! |
168 | | //! // Note that calling `r.recv()` does not block. |
169 | | //! // Instead, `Err(RecvError)` is returned immediately. |
170 | | //! assert_eq!(r.recv(), Err(RecvError)); |
171 | | //! ``` |
172 | | //! |
173 | | //! # Blocking operations |
174 | | //! |
175 | | //! Send and receive operations come in three flavors: |
176 | | //! |
177 | | //! * Non-blocking (returns immediately with success or failure). |
178 | | //! * Blocking (waits until the operation succeeds or the channel becomes disconnected). |
179 | | //! * Blocking with a timeout (blocks only for a certain duration of time). |
180 | | //! |
181 | | //! A simple example showing the difference between non-blocking and blocking operations: |
182 | | //! |
183 | | //! ``` |
184 | | //! use crossbeam_channel::{bounded, RecvError, TryRecvError}; |
185 | | //! |
186 | | //! let (s, r) = bounded(1); |
187 | | //! |
188 | | //! // Send a message into the channel. |
189 | | //! s.send("foo").unwrap(); |
190 | | //! |
191 | | //! // This call would block because the channel is full. |
192 | | //! // s.send("bar").unwrap(); |
193 | | //! |
194 | | //! // Receive the message. |
195 | | //! assert_eq!(r.recv(), Ok("foo")); |
196 | | //! |
197 | | //! // This call would block because the channel is empty. |
198 | | //! // r.recv(); |
199 | | //! |
200 | | //! // Try receiving a message without blocking. |
201 | | //! assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); |
202 | | //! |
203 | | //! // Disconnect the channel. |
204 | | //! drop(s); |
205 | | //! |
206 | | //! // This call doesn't block because the channel is now disconnected. |
207 | | //! assert_eq!(r.recv(), Err(RecvError)); |
208 | | //! ``` |
209 | | //! |
210 | | //! # Iteration |
211 | | //! |
212 | | //! Receivers can be used as iterators. For example, method [`iter`] creates an iterator that |
213 | | //! receives messages until the channel becomes empty and disconnected. Note that iteration may |
214 | | //! block waiting for next message to arrive. |
215 | | //! |
216 | | //! ``` |
217 | | //! use std::thread; |
218 | | //! use crossbeam_channel::unbounded; |
219 | | //! |
220 | | //! let (s, r) = unbounded(); |
221 | | //! |
222 | | //! thread::spawn(move || { |
223 | | //! s.send(1).unwrap(); |
224 | | //! s.send(2).unwrap(); |
225 | | //! s.send(3).unwrap(); |
226 | | //! drop(s); // Disconnect the channel. |
227 | | //! }); |
228 | | //! |
229 | | //! // Collect all messages from the channel. |
230 | | //! // Note that the call to `collect` blocks until the sender is dropped. |
231 | | //! let v: Vec<_> = r.iter().collect(); |
232 | | //! |
233 | | //! assert_eq!(v, [1, 2, 3]); |
234 | | //! ``` |
235 | | //! |
236 | | //! A non-blocking iterator can be created using [`try_iter`], which receives all available |
237 | | //! messages without blocking: |
238 | | //! |
239 | | //! ``` |
240 | | //! use crossbeam_channel::unbounded; |
241 | | //! |
242 | | //! let (s, r) = unbounded(); |
243 | | //! s.send(1).unwrap(); |
244 | | //! s.send(2).unwrap(); |
245 | | //! s.send(3).unwrap(); |
246 | | //! // No need to drop the sender. |
247 | | //! |
248 | | //! // Receive all messages currently in the channel. |
249 | | //! let v: Vec<_> = r.try_iter().collect(); |
250 | | //! |
251 | | //! assert_eq!(v, [1, 2, 3]); |
252 | | //! ``` |
253 | | //! |
254 | | //! # Selection |
255 | | //! |
256 | | //! The [`select!`] macro allows you to define a set of channel operations, wait until any one of |
257 | | //! them becomes ready, and finally execute it. If multiple operations are ready at the same time, |
258 | | //! a random one among them is selected. |
259 | | //! |
260 | | //! It is also possible to define a `default` case that gets executed if none of the operations are |
261 | | //! ready, either right away or for a certain duration of time. |
262 | | //! |
263 | | //! An operation is considered to be ready if it doesn't have to block. Note that it is ready even |
264 | | //! when it will simply return an error because the channel is disconnected. |
265 | | //! |
266 | | //! An example of receiving a message from two channels: |
267 | | //! |
268 | | //! ``` |
269 | | //! use std::thread; |
270 | | //! use std::time::Duration; |
271 | | //! use crossbeam_channel::{select, unbounded}; |
272 | | //! |
273 | | //! let (s1, r1) = unbounded(); |
274 | | //! let (s2, r2) = unbounded(); |
275 | | //! |
276 | | //! thread::spawn(move || s1.send(10).unwrap()); |
277 | | //! thread::spawn(move || s2.send(20).unwrap()); |
278 | | //! |
279 | | //! // At most one of these two receive operations will be executed. |
280 | | //! select! { |
281 | | //! recv(r1) -> msg => assert_eq!(msg, Ok(10)), |
282 | | //! recv(r2) -> msg => assert_eq!(msg, Ok(20)), |
283 | | //! default(Duration::from_secs(1)) => println!("timed out"), |
284 | | //! } |
285 | | //! ``` |
286 | | //! |
287 | | //! If you need to select over a dynamically created list of channel operations, use [`Select`] |
288 | | //! instead. The [`select!`] macro is just a convenience wrapper around [`Select`]. |
289 | | //! |
290 | | //! # Extra channels |
291 | | //! |
292 | | //! Three functions can create special kinds of channels, all of which return just a [`Receiver`] |
293 | | //! handle: |
294 | | //! |
295 | | //! * [`after`] creates a channel that delivers a single message after a certain duration of time. |
296 | | //! * [`tick`] creates a channel that delivers messages periodically. |
297 | | //! * [`never`](never()) creates a channel that never delivers messages. |
298 | | //! |
299 | | //! These channels are very efficient because messages get lazily generated on receive operations. |
300 | | //! |
301 | | //! An example that prints elapsed time every 50 milliseconds for the duration of 1 second: |
302 | | //! |
303 | | //! ``` |
304 | | //! use std::time::{Duration, Instant}; |
305 | | //! use crossbeam_channel::{after, select, tick}; |
306 | | //! |
307 | | //! let start = Instant::now(); |
308 | | //! let ticker = tick(Duration::from_millis(50)); |
309 | | //! let timeout = after(Duration::from_secs(1)); |
310 | | //! |
311 | | //! loop { |
312 | | //! select! { |
313 | | //! recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()), |
314 | | //! recv(timeout) -> _ => break, |
315 | | //! } |
316 | | //! } |
317 | | //! ``` |
318 | | //! |
319 | | //! [`send`]: Sender::send |
320 | | //! [`recv`]: Receiver::recv |
321 | | //! [`iter`]: Receiver::iter |
322 | | //! [`try_iter`]: Receiver::try_iter |
323 | | |
324 | | #![doc(test( |
325 | | no_crate_inject, |
326 | | attr( |
327 | | deny(warnings, rust_2018_idioms), |
328 | | allow(dead_code, unused_assignments, unused_variables) |
329 | | ) |
330 | | ))] |
331 | | #![warn( |
332 | | missing_docs, |
333 | | missing_debug_implementations, |
334 | | rust_2018_idioms, |
335 | | unreachable_pub |
336 | | )] |
337 | | #![cfg_attr(not(feature = "std"), no_std)] |
338 | | // matches! requires Rust 1.42 |
339 | | #![allow(clippy::match_like_matches_macro)] |
340 | | |
341 | | use cfg_if::cfg_if; |
342 | | |
343 | | cfg_if! { |
344 | | if #[cfg(feature = "std")] { |
345 | | mod channel; |
346 | | mod context; |
347 | | mod counter; |
348 | | mod err; |
349 | | mod flavors; |
350 | | mod select; |
351 | | mod select_macro; |
352 | | mod utils; |
353 | | mod waker; |
354 | | |
355 | | /// Crate internals used by the `select!` macro. |
356 | | #[doc(hidden)] |
357 | | pub mod internal { |
358 | | pub use crate::select::SelectHandle; |
359 | | pub use crate::select::{select, select_timeout, try_select}; |
360 | | } |
361 | | |
362 | | pub use crate::channel::{after, at, never, tick}; |
363 | | pub use crate::channel::{bounded, unbounded}; |
364 | | pub use crate::channel::{IntoIter, Iter, TryIter}; |
365 | | pub use crate::channel::{Receiver, Sender}; |
366 | | |
367 | | pub use crate::select::{Select, SelectedOperation}; |
368 | | |
369 | | pub use crate::err::{ReadyTimeoutError, SelectTimeoutError, TryReadyError, TrySelectError}; |
370 | | pub use crate::err::{RecvError, RecvTimeoutError, TryRecvError}; |
371 | | pub use crate::err::{SendError, SendTimeoutError, TrySendError}; |
372 | | } |
373 | | } |