Coverage Report

Created: 2021-01-22 16:54

crossbeam-channel/src/waker.rs
Line
Count
Source (jump to first uncovered line)
1
//! Waking mechanism for threads blocked on channel operations.
2
3
use std::sync::atomic::{AtomicBool, Ordering};
4
use std::thread::{self, ThreadId};
5
6
use crate::context::Context;
7
use crate::select::{Operation, Selected};
8
use crate::utils::Spinlock;
9
10
/// Represents a thread blocked on a specific channel operation.
11
pub(crate) struct Entry {
12
    /// The operation.
13
    pub(crate) oper: Operation,
14
15
    /// Optional packet.
16
    pub(crate) packet: usize,
17
18
    /// Context associated with the thread owning this operation.
19
    pub(crate) cx: Context,
20
}
21
22
/// A queue of threads blocked on channel operations.
23
///
24
/// This data structure is used by threads to register blocking operations and get woken up once
25
/// an operation becomes ready.
26
pub(crate) struct Waker {
27
    /// A list of select operations.
28
    selectors: Vec<Entry>,
29
30
    /// A list of operations waiting to be ready.
31
    observers: Vec<Entry>,
32
}
33
34
impl Waker {
35
    /// Creates a new `Waker`.
36
    #[inline]
37
88.0k
    pub(crate) fn new() -> Self {
38
88.0k
        Waker {
39
88.0k
            selectors: Vec::new(),
40
88.0k
            observers: Vec::new(),
41
88.0k
        }
42
88.0k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
22.2k
    pub(crate) fn new() -> Self {
38
22.2k
        Waker {
39
22.2k
            selectors: Vec::new(),
40
22.2k
            observers: Vec::new(),
41
22.2k
        }
42
22.2k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
6.23k
    pub(crate) fn new() -> Self {
38
6.23k
        Waker {
39
6.23k
            selectors: Vec::new(),
40
6.23k
            observers: Vec::new(),
41
6.23k
        }
42
6.23k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
622
    pub(crate) fn new() -> Self {
38
622
        Waker {
39
622
            selectors: Vec::new(),
40
622
            observers: Vec::new(),
41
622
        }
42
622
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
22.2k
    pub(crate) fn new() -> Self {
38
22.2k
        Waker {
39
22.2k
            selectors: Vec::new(),
40
22.2k
            observers: Vec::new(),
41
22.2k
        }
42
22.2k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
8
    pub(crate) fn new() -> Self {
38
8
        Waker {
39
8
            selectors: Vec::new(),
40
8
            observers: Vec::new(),
41
8
        }
42
8
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
1
    pub(crate) fn new() -> Self {
38
1
        Waker {
39
1
            selectors: Vec::new(),
40
1
            observers: Vec::new(),
41
1
        }
42
1
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
13
    pub(crate) fn new() -> Self {
38
13
        Waker {
39
13
            selectors: Vec::new(),
40
13
            observers: Vec::new(),
41
13
        }
42
13
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
2
    pub(crate) fn new() -> Self {
38
2
        Waker {
39
2
            selectors: Vec::new(),
40
2
            observers: Vec::new(),
41
2
        }
42
2
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
11.1k
    pub(crate) fn new() -> Self {
38
11.1k
        Waker {
39
11.1k
            selectors: Vec::new(),
40
11.1k
            observers: Vec::new(),
41
11.1k
        }
42
11.1k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
13.1k
    pub(crate) fn new() -> Self {
38
13.1k
        Waker {
39
13.1k
            selectors: Vec::new(),
40
13.1k
            observers: Vec::new(),
41
13.1k
        }
42
13.1k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
6.26k
    pub(crate) fn new() -> Self {
38
6.26k
        Waker {
39
6.26k
            selectors: Vec::new(),
40
6.26k
            observers: Vec::new(),
41
6.26k
        }
42
6.26k
    }
<crossbeam_channel::waker::Waker>::new
Line
Count
Source
37
6.07k
    pub(crate) fn new() -> Self {
38
6.07k
        Waker {
39
6.07k
            selectors: Vec::new(),
40
6.07k
            observers: Vec::new(),
41
6.07k
        }
42
6.07k
    }
43
44
    /// Registers a select operation.
45
    #[inline]
46
80.5k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
80.5k
        self.register_with_packet(oper, 0, cx);
48
80.5k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
486
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
486
        self.register_with_packet(oper, 0, cx);
48
486
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
2.19k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
2.19k
        self.register_with_packet(oper, 0, cx);
48
2.19k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
12.1k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
12.1k
        self.register_with_packet(oper, 0, cx);
48
12.1k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
697
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
697
        self.register_with_packet(oper, 0, cx);
48
697
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
2
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
2
        self.register_with_packet(oper, 0, cx);
48
2
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
1.83k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
1.83k
        self.register_with_packet(oper, 0, cx);
48
1.83k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
51.0k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
51.0k
        self.register_with_packet(oper, 0, cx);
48
51.0k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
12.0k
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
12.0k
        self.register_with_packet(oper, 0, cx);
48
12.0k
    }
<crossbeam_channel::waker::Waker>::register
Line
Count
Source
46
1
    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47
1
        self.register_with_packet(oper, 0, cx);
48
1
    }
49
50
    /// Registers a select operation and a packet.
51
    #[inline]
52
10.8M
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
10.8M
        self.selectors.push(Entry {
54
10.8M
            oper,
55
10.8M
            packet,
56
10.8M
            cx: cx.clone(),
57
10.8M
        });
58
10.8M
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
424k
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
424k
        self.selectors.push(Entry {
54
424k
            oper,
55
424k
            packet,
56
424k
            cx: cx.clone(),
57
424k
        });
58
424k
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
5.99M
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
5.99M
        self.selectors.push(Entry {
54
5.99M
            oper,
55
5.99M
            packet,
56
5.99M
            cx: cx.clone(),
57
5.99M
        });
58
5.99M
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
46.2k
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
46.2k
        self.selectors.push(Entry {
54
46.2k
            oper,
55
46.2k
            packet,
56
46.2k
            cx: cx.clone(),
57
46.2k
        });
58
46.2k
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
697
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
697
        self.selectors.push(Entry {
54
697
            oper,
55
697
            packet,
56
697
            cx: cx.clone(),
57
697
        });
58
697
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
2
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
2
        self.selectors.push(Entry {
54
2
            oper,
55
2
            packet,
56
2
            cx: cx.clone(),
57
2
        });
58
2
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
1.83k
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
1.83k
        self.selectors.push(Entry {
54
1.83k
            oper,
55
1.83k
            packet,
56
1.83k
            cx: cx.clone(),
57
1.83k
        });
58
1.83k
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
159k
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
159k
        self.selectors.push(Entry {
54
159k
            oper,
55
159k
            packet,
56
159k
            cx: cx.clone(),
57
159k
        });
58
159k
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
4.09M
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
4.09M
        self.selectors.push(Entry {
54
4.09M
            oper,
55
4.09M
            packet,
56
4.09M
            cx: cx.clone(),
57
4.09M
        });
58
4.09M
    }
<crossbeam_channel::waker::Waker>::register_with_packet
Line
Count
Source
52
75.0k
    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53
75.0k
        self.selectors.push(Entry {
54
75.0k
            oper,
55
75.0k
            packet,
56
75.0k
            cx: cx.clone(),
57
75.0k
        });
58
75.0k
    }
59
60
    /// Unregisters a select operation.
61
    #[inline]
62
    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
63
8.59M
        if let Some((
i8.53M
, _)) = self
64
8.59M
            .selectors
65
8.59M
            .iter()
66
8.59M
            .enumerate()
67
8.59M
            .find(|&(_, entry)| 
entry.oper == oper8.55M
)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
51.5k
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
5.15M
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
10.2k
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
577
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
1.54k
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
60.7k
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
3.27M
            .find(|&(_, entry)| entry.oper == oper)
<crossbeam_channel::waker::Waker>::unregister::{closure#0}
Line
Count
Source
67
1
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
8.53M
            let entry = self.selectors.remove(i);
70
8.53M
            Some(entry)
71
        } else {
72
68.5k
            None
73
        }
74
8.59M
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
55.5k
        if let Some((
i35.5k
, _)) = self
64
55.5k
            .selectors
65
55.5k
            .iter()
66
55.5k
            .enumerate()
67
55.5k
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
35.5k
            let entry = self.selectors.remove(i);
70
35.5k
            Some(entry)
71
        } else {
72
19.9k
            None
73
        }
74
55.5k
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
5.16M
        if let Some((
i5.15M
, _)) = self
64
5.16M
            .selectors
65
5.16M
            .iter()
66
5.16M
            .enumerate()
67
5.16M
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
5.15M
            let entry = self.selectors.remove(i);
70
5.15M
            Some(entry)
71
        } else {
72
3.42k
            None
73
        }
74
5.16M
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
12.0k
        if let Some((
i10.2k
, _)) = self
64
12.0k
            .selectors
65
12.0k
            .iter()
66
12.0k
            .enumerate()
67
12.0k
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
10.2k
            let entry = self.selectors.remove(i);
70
10.2k
            Some(entry)
71
        } else {
72
1.79k
            None
73
        }
74
12.0k
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
576
        if let Some((i, _)) = self
64
576
            .selectors
65
576
            .iter()
66
576
            .enumerate()
67
576
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
576
            let entry = self.selectors.remove(i);
70
576
            Some(entry)
71
        } else {
72
0
            None
73
        }
74
576
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unregister
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
1.54k
        if let Some((i, _)) = self
64
1.54k
            .selectors
65
1.54k
            .iter()
66
1.54k
            .enumerate()
67
1.54k
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
1.54k
            let entry = self.selectors.remove(i);
70
1.54k
            Some(entry)
71
        } else {
72
0
            None
73
        }
74
1.54k
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
77.8k
        if let Some((
i52.6k
, _)) = self
64
77.8k
            .selectors
65
77.8k
            .iter()
66
77.8k
            .enumerate()
67
77.8k
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
52.6k
            let entry = self.selectors.remove(i);
70
52.6k
            Some(entry)
71
        } else {
72
25.2k
            None
73
        }
74
77.8k
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
3.27M
        if let Some((
i3.27M
, _)) = self
64
3.27M
            .selectors
65
3.27M
            .iter()
66
3.27M
            .enumerate()
67
3.27M
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
3.27M
            let entry = self.selectors.remove(i);
70
3.27M
            Some(entry)
71
        } else {
72
1.84k
            None
73
        }
74
3.27M
    }
<crossbeam_channel::waker::Waker>::unregister
Line
Count
Source
63
16.2k
        if let Some((
i1
, _)) = self
64
16.2k
            .selectors
65
16.2k
            .iter()
66
16.2k
            .enumerate()
67
16.2k
            .find(|&(_, entry)| entry.oper == oper)
68
        {
69
1
            let entry = self.selectors.remove(i);
70
1
            Some(entry)
71
        } else {
72
16.2k
            None
73
        }
74
16.2k
    }
75
76
    /// Attempts to find another thread's entry, select the operation, and wake it up.
77
    #[inline]
78
91.4M
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
91.4M
        let mut entry = None;
80
91.4M
81
91.4M
        if !self.selectors.is_empty() {
82
2.32M
            let thread_id = current_thread_id();
83
84
2.33M
            for i in 0..
self.selectors.len()2.32M
{
85
                // Does the entry belong to a different thread?
86
2.33M
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
2.34M
                    let sel = Selected::Operation(self.selectors[i].oper);
89
2.34M
                    let res = self.selectors[i].cx.try_select(sel);
90
2.34M
91
2.34M
                    if res.is_ok() {
92
                        // Provide the packet.
93
2.31M
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
2.31M
                        // Wake the thread up.
95
2.31M
                        self.selectors[i].cx.unpark();
96
2.31M
97
2.31M
                        // Remove the entry from the queue to keep it clean and improve
98
2.31M
                        // performance.
99
2.31M
                        entry = Some(self.selectors.remove(i));
100
2.31M
                        break;
101
22.3k
                    }
102
18.4E
                }
103
            }
104
89.1M
        }
105
106
91.5M
        entry
107
91.5M
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
859k
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
859k
        let mut entry = None;
80
859k
81
859k
        if !self.selectors.is_empty() {
82
394k
            let thread_id = current_thread_id();
83
84
400k
            for i in 0..
self.selectors.len()394k
{
85
                // Does the entry belong to a different thread?
86
400k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
403k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
403k
                    let res = self.selectors[i].cx.try_select(sel);
90
403k
91
403k
                    if res.is_ok() {
92
                        // Provide the packet.
93
398k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
398k
                        // Wake the thread up.
95
398k
                        self.selectors[i].cx.unpark();
96
398k
97
398k
                        // Remove the entry from the queue to keep it clean and improve
98
398k
                        // performance.
99
398k
                        entry = Some(self.selectors.remove(i));
100
398k
                        break;
101
4.69k
                    }
102
18.4E
                }
103
            }
104
464k
        }
105
106
864k
        entry
107
864k
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
14.5M
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
14.5M
        let mut entry = None;
80
14.5M
81
14.5M
        if !self.selectors.is_empty() {
82
858k
            let thread_id = current_thread_id();
83
84
862k
            for i in 0..
self.selectors.len()858k
{
85
                // Does the entry belong to a different thread?
86
862k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
862k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
862k
                    let res = self.selectors[i].cx.try_select(sel);
90
862k
91
862k
                    if res.is_ok() {
92
                        // Provide the packet.
93
860k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
860k
                        // Wake the thread up.
95
860k
                        self.selectors[i].cx.unpark();
96
860k
97
860k
                        // Remove the entry from the queue to keep it clean and improve
98
860k
                        // performance.
99
860k
                        entry = Some(self.selectors.remove(i));
100
860k
                        break;
101
1.69k
                    }
102
18.4E
                }
103
            }
104
13.6M
        }
105
106
14.5M
        entry
107
14.5M
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
71.9k
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
71.9k
        let mut entry = None;
80
71.9k
81
71.9k
        if !self.selectors.is_empty() {
82
37.7k
            let thread_id = current_thread_id();
83
84
38.1k
            for i in 0..
self.selectors.len()37.7k
{
85
                // Does the entry belong to a different thread?
86
38.1k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
38.2k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
38.2k
                    let res = self.selectors[i].cx.try_select(sel);
90
38.2k
91
38.2k
                    if res.is_ok() {
92
                        // Provide the packet.
93
37.4k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
37.4k
                        // Wake the thread up.
95
37.4k
                        self.selectors[i].cx.unpark();
96
37.4k
97
37.4k
                        // Remove the entry from the queue to keep it clean and improve
98
37.4k
                        // performance.
99
37.4k
                        entry = Some(self.selectors.remove(i));
100
37.4k
                        break;
101
775
                    }
102
18.4E
                }
103
            }
104
34.1k
        }
105
106
72.1k
        entry
107
72.1k
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
280
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
280
        let mut entry = None;
80
280
81
280
        if !self.selectors.is_empty() {
82
280
            let thread_id = current_thread_id();
83
84
280
            for i in 0..self.selectors.len() {
85
                // Does the entry belong to a different thread?
86
280
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
280
                    let sel = Selected::Operation(self.selectors[i].oper);
89
280
                    let res = self.selectors[i].cx.try_select(sel);
90
280
91
280
                    if res.is_ok() {
92
                        // Provide the packet.
93
121
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
121
                        // Wake the thread up.
95
121
                        self.selectors[i].cx.unpark();
96
121
97
121
                        // Remove the entry from the queue to keep it clean and improve
98
121
                        // performance.
99
121
                        entry = Some(self.selectors.remove(i));
100
121
                        break;
101
159
                    }
102
0
                }
103
            }
104
0
        }
105
106
280
        entry
107
280
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
2
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
2
        let mut entry = None;
80
2
81
2
        if !self.selectors.is_empty() {
82
2
            let thread_id = current_thread_id();
83
84
2
            for i in 0..self.selectors.len() {
85
                // Does the entry belong to a different thread?
86
2
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
2
                    let sel = Selected::Operation(self.selectors[i].oper);
89
2
                    let res = self.selectors[i].cx.try_select(sel);
90
2
91
2
                    if res.is_ok() {
92
                        // Provide the packet.
93
2
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
2
                        // Wake the thread up.
95
2
                        self.selectors[i].cx.unpark();
96
2
97
2
                        // Remove the entry from the queue to keep it clean and improve
98
2
                        // performance.
99
2
                        entry = Some(self.selectors.remove(i));
100
2
                        break;
101
0
                    }
102
0
                }
103
            }
104
0
        }
105
106
2
        entry
107
2
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::try_select
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
590
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
590
        let mut entry = None;
80
590
81
590
        if !self.selectors.is_empty() {
82
590
            let thread_id = current_thread_id();
83
84
590
            for i in 0..self.selectors.len() {
85
                // Does the entry belong to a different thread?
86
590
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
589
                    let sel = Selected::Operation(self.selectors[i].oper);
89
589
                    let res = self.selectors[i].cx.try_select(sel);
90
589
91
589
                    if res.is_ok() {
92
                        // Provide the packet.
93
290
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
290
                        // Wake the thread up.
95
290
                        self.selectors[i].cx.unpark();
96
290
97
290
                        // Remove the entry from the queue to keep it clean and improve
98
290
                        // performance.
99
290
                        entry = Some(self.selectors.remove(i));
100
290
                        break;
101
299
                    }
102
1
                }
103
            }
104
0
        }
105
106
589
        entry
107
589
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
287k
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
287k
        let mut entry = None;
80
287k
81
287k
        if !self.selectors.is_empty() {
82
118k
            let thread_id = current_thread_id();
83
84
119k
            for i in 0..
self.selectors.len()118k
{
85
                // Does the entry belong to a different thread?
86
119k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
120k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
120k
                    let res = self.selectors[i].cx.try_select(sel);
90
120k
91
120k
                    if res.is_ok() {
92
                        // Provide the packet.
93
107k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
107k
                        // Wake the thread up.
95
107k
                        self.selectors[i].cx.unpark();
96
107k
97
107k
                        // Remove the entry from the queue to keep it clean and improve
98
107k
                        // performance.
99
107k
                        entry = Some(self.selectors.remove(i));
100
107k
                        break;
101
12.2k
                    }
102
18.4E
                }
103
            }
104
169k
        }
105
106
287k
        entry
107
287k
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
75.5M
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
75.5M
        let mut entry = None;
80
75.5M
81
75.5M
        if !self.selectors.is_empty() {
82
837k
            let thread_id = current_thread_id();
83
84
841k
            for i in 0..
self.selectors.len()837k
{
85
                // Does the entry belong to a different thread?
86
841k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
841k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
841k
                    let res = self.selectors[i].cx.try_select(sel);
90
841k
91
841k
                    if res.is_ok() {
92
                        // Provide the packet.
93
838k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
838k
                        // Wake the thread up.
95
838k
                        self.selectors[i].cx.unpark();
96
838k
97
838k
                        // Remove the entry from the queue to keep it clean and improve
98
838k
                        // performance.
99
838k
                        entry = Some(self.selectors.remove(i));
100
838k
                        break;
101
2.28k
                    }
102
18.4E
                }
103
            }
104
74.7M
        }
105
106
75.5M
        entry
107
75.5M
    }
<crossbeam_channel::waker::Waker>::try_select
Line
Count
Source
78
187k
    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79
187k
        let mut entry = None;
80
187k
81
187k
        if !self.selectors.is_empty() {
82
75.5k
            let thread_id = current_thread_id();
83
84
75.6k
            for i in 0..
self.selectors.len()75.5k
{
85
                // Does the entry belong to a different thread?
86
75.6k
                if self.selectors[i].cx.thread_id() != thread_id {
87
                    // Try selecting this operation.
88
75.5k
                    let sel = Selected::Operation(self.selectors[i].oper);
89
75.5k
                    let res = self.selectors[i].cx.try_select(sel);
90
75.5k
91
75.5k
                    if res.is_ok() {
92
                        // Provide the packet.
93
75.3k
                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94
75.3k
                        // Wake the thread up.
95
75.3k
                        self.selectors[i].cx.unpark();
96
75.3k
97
75.3k
                        // Remove the entry from the queue to keep it clean and improve
98
75.3k
                        // performance.
99
75.3k
                        entry = Some(self.selectors.remove(i));
100
75.3k
                        break;
101
221
                    }
102
76
                }
103
            }
104
112k
        }
105
106
187k
        entry
107
187k
    }
108
109
    /// Returns `true` if there is an entry which can be selected by the current thread.
110
    #[inline]
111
8.66M
    pub(crate) fn can_select(&self) -> bool {
112
8.66M
        if self.selectors.is_empty() {
113
8.58M
            false
114
        } else {
115
88.1k
            let thread_id = current_thread_id();
116
88.1k
117
89.9k
            self.selectors.iter().any(|entry| {
118
89.9k
                entry.cx.thread_id() != thread_id && 
entry.cx.selected() == Selected::Waiting89.1k
119
89.6k
            }
)88.1k
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
13.1k
            self.selectors.iter().any(|entry| {
118
13.1k
                
entry.cx.thread_id() != thread_id13.1k
&& entry.cx.selected() == Selected::Waiting
119
13.1k
            })
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
3.15k
            self.selectors.iter().any(|entry| {
118
3.15k
                entry.cx.thread_id() != thread_id && 
entry.cx.selected() == Selected::Waiting3.08k
119
3.15k
            })
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
1
            self.selectors.iter().any(|entry| {
118
1
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
1
            })
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
13.8k
            self.selectors.iter().any(|entry| {
118
13.8k
                entry.cx.thread_id() != thread_id && 
entry.cx.selected() == Selected::Waiting13.1k
119
13.6k
            })
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
3.79k
            self.selectors.iter().any(|entry| {
118
3.79k
                entry.cx.thread_id() != thread_id && 
entry.cx.selected() == Selected::Waiting3.73k
119
3.79k
            })
<crossbeam_channel::waker::Waker>::can_select::{closure#0}
Line
Count
Source
117
55.9k
            self.selectors.iter().any(|entry| {
118
55.9k
                entry.cx.thread_id() != thread_id && 
entry.cx.selected() == Selected::Waiting55.9k
119
55.8k
            })
120
        }
121
8.66M
    }
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
54.7k
    pub(crate) fn can_select(&self) -> bool {
112
54.7k
        if self.selectors.is_empty() {
113
43.1k
            false
114
        } else {
115
11.6k
            let thread_id = current_thread_id();
116
11.6k
117
11.6k
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
11.6k
            })
120
        }
121
54.7k
    }
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
5.15M
    pub(crate) fn can_select(&self) -> bool {
112
5.15M
        if self.selectors.is_empty() {
113
5.15M
            false
114
        } else {
115
3.84k
            let thread_id = current_thread_id();
116
3.84k
117
3.84k
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
3.84k
            })
120
        }
121
5.15M
    }
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
1
    pub(crate) fn can_select(&self) -> bool {
112
1
        if self.selectors.is_empty() {
113
0
            false
114
        } else {
115
1
            let thread_id = current_thread_id();
116
1
117
1
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
1
            })
120
        }
121
1
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::can_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::can_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::can_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::can_select
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::can_select
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
58.0k
    pub(crate) fn can_select(&self) -> bool {
112
58.0k
        if self.selectors.is_empty() {
113
44.5k
            false
114
        } else {
115
13.5k
            let thread_id = current_thread_id();
116
13.5k
117
13.5k
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
13.5k
            })
120
        }
121
58.0k
    }
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
3.26M
    pub(crate) fn can_select(&self) -> bool {
112
3.26M
        if self.selectors.is_empty() {
113
3.25M
            false
114
        } else {
115
3.66k
            let thread_id = current_thread_id();
116
3.66k
117
3.66k
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
3.66k
            })
120
        }
121
3.26M
    }
<crossbeam_channel::waker::Waker>::can_select
Line
Count
Source
111
134k
    pub(crate) fn can_select(&self) -> bool {
112
134k
        if self.selectors.is_empty() {
113
79.2k
            false
114
        } else {
115
55.4k
            let thread_id = current_thread_id();
116
55.4k
117
55.4k
            self.selectors.iter().any(|entry| {
118
                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119
55.4k
            })
120
        }
121
134k
    }
122
123
    /// Registers an operation waiting to be ready.
124
    #[inline]
125
27
    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126
27
        self.observers.push(Entry {
127
27
            oper,
128
27
            packet: 0,
129
27
            cx: cx.clone(),
130
27
        });
131
27
    }
132
133
    /// Unregisters an operation waiting to be ready.
134
    #[inline]
135
27
    pub(crate) fn unwatch(&mut self, oper: Operation) {
136
27
        self.observers.retain(|e| 
e.oper != oper19
);
137
27
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::unwatch
<crossbeam_channel::waker::Waker>::unwatch
Line
Count
Source
135
27
    pub(crate) fn unwatch(&mut self, oper: Operation) {
136
27
        self.observers.retain(|e| e.oper != oper);
137
27
    }
138
139
    /// Notifies all operations waiting to be ready.
140
    #[inline]
141
10.8M
    pub(crate) fn notify(&mut self) {
142
10.8M
        for 
entry8
in self.observers.drain(..) {
143
8
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
7
                entry.cx.unpark();
145
7
            }
1
146
        }
147
10.8M
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
442k
    pub(crate) fn notify(&mut self) {
142
442k
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
447k
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
5.99M
    pub(crate) fn notify(&mut self) {
142
5.99M
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
6.01M
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
37.2k
    pub(crate) fn notify(&mut self) {
142
37.2k
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
37.9k
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
22.5k
    pub(crate) fn notify(&mut self) {
142
22.5k
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
22.5k
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
9
    pub(crate) fn notify(&mut self) {
142
9
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
10
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
1
    pub(crate) fn notify(&mut self) {
142
1
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
1
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::notify
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
13
    pub(crate) fn notify(&mut self) {
142
13
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
13
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
2
    pub(crate) fn notify(&mut self) {
142
2
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
2
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
11.7k
    pub(crate) fn notify(&mut self) {
142
11.7k
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
11.7k
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
164k
    pub(crate) fn notify(&mut self) {
142
164k
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
165k
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
4.09M
    pub(crate) fn notify(&mut self) {
142
4.09M
        for 
entry0
in self.observers.drain(..) {
143
0
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
0
                entry.cx.unpark();
145
0
            }
146
        }
147
4.10M
    }
<crossbeam_channel::waker::Waker>::notify
Line
Count
Source
141
80.7k
    pub(crate) fn notify(&mut self) {
142
80.7k
        for 
entry8
in self.observers.drain(..) {
143
8
            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144
7
                entry.cx.unpark();
145
7
            }
1
146
        }
147
81.2k
    }
148
149
    /// Notifies all registered operations that the channel is disconnected.
150
    #[inline]
151
88.0k
    pub(crate) fn disconnect(&mut self) {
152
88.0k
        for 
entry360
in self.selectors.iter() {
153
360
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
26
                // Wake the thread up.
155
26
                //
156
26
                // Here we don't remove the entry from the queue. Registered threads must
157
26
                // unregister from the waker by themselves. They might also want to recover the
158
26
                // packet value and destroy it, if necessary.
159
26
                entry.cx.unpark();
160
334
            }
161
        }
162
163
88.0k
        self.notify();
164
88.0k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
22.2k
    pub(crate) fn disconnect(&mut self) {
152
22.2k
        for 
entry5
in self.selectors.iter() {
153
5
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
5
                // Wake the thread up.
155
5
                //
156
5
                // Here we don't remove the entry from the queue. Registered threads must
157
5
                // unregister from the waker by themselves. They might also want to recover the
158
5
                // packet value and destroy it, if necessary.
159
5
                entry.cx.unpark();
160
5
            }
0
161
        }
162
163
22.2k
        self.notify();
164
22.2k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
6.23k
    pub(crate) fn disconnect(&mut self) {
152
6.23k
        for 
entry334
in self.selectors.iter() {
153
334
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
1
                // Wake the thread up.
155
1
                //
156
1
                // Here we don't remove the entry from the queue. Registered threads must
157
1
                // unregister from the waker by themselves. They might also want to recover the
158
1
                // packet value and destroy it, if necessary.
159
1
                entry.cx.unpark();
160
333
            }
161
        }
162
163
6.23k
        self.notify();
164
6.23k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
624
    pub(crate) fn disconnect(&mut self) {
152
624
        for 
entry7
in self.selectors.iter() {
153
7
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
7
                // Wake the thread up.
155
7
                //
156
7
                // Here we don't remove the entry from the queue. Registered threads must
157
7
                // unregister from the waker by themselves. They might also want to recover the
158
7
                // packet value and destroy it, if necessary.
159
7
                entry.cx.unpark();
160
7
            }
0
161
        }
162
163
623
        self.notify();
164
623
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
22.2k
    pub(crate) fn disconnect(&mut self) {
152
22.2k
        for 
entry2
in self.selectors.iter() {
153
2
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
2
                // Wake the thread up.
155
2
                //
156
2
                // Here we don't remove the entry from the queue. Registered threads must
157
2
                // unregister from the waker by themselves. They might also want to recover the
158
2
                // packet value and destroy it, if necessary.
159
2
                entry.cx.unpark();
160
2
            }
0
161
        }
162
163
22.2k
        self.notify();
164
22.2k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
8
    pub(crate) fn disconnect(&mut self) {
152
8
        for 
entry0
in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
7
        self.notify();
164
7
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
1
    pub(crate) fn disconnect(&mut self) {
152
1
        for 
entry0
in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
1
        self.notify();
164
1
    }
Unexecuted instantiation: <crossbeam_channel::waker::Waker>::disconnect
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
13
    pub(crate) fn disconnect(&mut self) {
152
13
        for 
entry0
in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
13
        self.notify();
164
13
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
2
    pub(crate) fn disconnect(&mut self) {
152
2
        for 
entry0
in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
2
        self.notify();
164
2
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
11.1k
    pub(crate) fn disconnect(&mut self) {
152
11.1k
        for 
entry1
in self.selectors.iter() {
153
1
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
1
                // Wake the thread up.
155
1
                //
156
1
                // Here we don't remove the entry from the queue. Registered threads must
157
1
                // unregister from the waker by themselves. They might also want to recover the
158
1
                // packet value and destroy it, if necessary.
159
1
                entry.cx.unpark();
160
1
            }
0
161
        }
162
163
11.1k
        self.notify();
164
11.1k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
13.1k
    pub(crate) fn disconnect(&mut self) {
152
13.1k
        for 
entry6
in self.selectors.iter() {
153
6
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
5
                // Wake the thread up.
155
5
                //
156
5
                // Here we don't remove the entry from the queue. Registered threads must
157
5
                // unregister from the waker by themselves. They might also want to recover the
158
5
                // packet value and destroy it, if necessary.
159
5
                entry.cx.unpark();
160
5
            }
1
161
        }
162
163
13.1k
        self.notify();
164
13.1k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
6.27k
    pub(crate) fn disconnect(&mut self) {
152
6.27k
        for 
entry5
in self.selectors.iter() {
153
5
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
5
                // Wake the thread up.
155
5
                //
156
5
                // Here we don't remove the entry from the queue. Registered threads must
157
5
                // unregister from the waker by themselves. They might also want to recover the
158
5
                // packet value and destroy it, if necessary.
159
5
                entry.cx.unpark();
160
5
            }
0
161
        }
162
163
6.27k
        self.notify();
164
6.27k
    }
<crossbeam_channel::waker::Waker>::disconnect
Line
Count
Source
151
6.07k
    pub(crate) fn disconnect(&mut self) {
152
6.07k
        for 
entry0
in self.selectors.iter() {
153
0
            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154
0
                // Wake the thread up.
155
0
                //
156
0
                // Here we don't remove the entry from the queue. Registered threads must
157
0
                // unregister from the waker by themselves. They might also want to recover the
158
0
                // packet value and destroy it, if necessary.
159
0
                entry.cx.unpark();
160
0
            }
161
        }
162
163
6.07k
        self.notify();
164
6.07k
    }
165
}
166
167
impl Drop for Waker {
168
    #[inline]
169
88.0k
    fn drop(&mut self) {
170
88.0k
        debug_assert_eq!(self.selectors.len(), 0);
171
88.0k
        debug_assert_eq!(self.observers.len(), 0);
172
88.0k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
22.2k
    fn drop(&mut self) {
170
22.2k
        debug_assert_eq!(self.selectors.len(), 0);
171
22.2k
        debug_assert_eq!(self.observers.len(), 0);
172
22.2k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
6.23k
    fn drop(&mut self) {
170
6.23k
        debug_assert_eq!(self.selectors.len(), 0);
171
6.23k
        debug_assert_eq!(self.observers.len(), 0);
172
6.23k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
623
    fn drop(&mut self) {
170
623
        debug_assert_eq!(self.selectors.len(), 0);
171
623
        debug_assert_eq!(self.observers.len(), 0);
172
623
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
22.2k
    fn drop(&mut self) {
170
22.2k
        debug_assert_eq!(self.selectors.len(), 0);
171
22.2k
        debug_assert_eq!(self.observers.len(), 0);
172
22.2k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
8
    fn drop(&mut self) {
170
8
        debug_assert_eq!(self.selectors.len(), 0);
171
8
        debug_assert_eq!(self.observers.len(), 0);
172
8
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
1
    fn drop(&mut self) {
170
1
        debug_assert_eq!(self.selectors.len(), 0);
171
1
        debug_assert_eq!(self.observers.len(), 0);
172
1
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
13
    fn drop(&mut self) {
170
13
        debug_assert_eq!(self.selectors.len(), 0);
171
13
        debug_assert_eq!(self.observers.len(), 0);
172
13
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
2
    fn drop(&mut self) {
170
2
        debug_assert_eq!(self.selectors.len(), 0);
171
2
        debug_assert_eq!(self.observers.len(), 0);
172
2
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
11.1k
    fn drop(&mut self) {
170
11.1k
        debug_assert_eq!(self.selectors.len(), 0);
171
11.1k
        debug_assert_eq!(self.observers.len(), 0);
172
11.1k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
13.1k
    fn drop(&mut self) {
170
13.1k
        debug_assert_eq!(self.selectors.len(), 0);
171
13.1k
        debug_assert_eq!(self.observers.len(), 0);
172
13.1k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
6.27k
    fn drop(&mut self) {
170
6.27k
        debug_assert_eq!(self.selectors.len(), 0);
171
6.27k
        debug_assert_eq!(self.observers.len(), 0);
172
6.27k
    }
<crossbeam_channel::waker::Waker as core::ops::drop::Drop>::drop
Line
Count
Source
169
6.07k
    fn drop(&mut self) {
170
6.07k
        debug_assert_eq!(self.selectors.len(), 0);
171
6.07k
        debug_assert_eq!(self.observers.len(), 0);
172
6.07k
    }
173
}
174
175
/// A waker that can be shared among threads without locking.
176
///
177
/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
178
pub(crate) struct SyncWaker {
179
    /// The inner `Waker`.
180
    inner: Spinlock<Waker>,
181
182
    /// `true` if the waker is empty.
183
    is_empty: AtomicBool,
184
}
185
186
impl SyncWaker {
187
    /// Creates a new `SyncWaker`.
188
    #[inline]
189
78.4k
    pub(crate) fn new() -> Self {
190
78.4k
        SyncWaker {
191
78.4k
            inner: Spinlock::new(Waker::new()),
192
78.4k
            is_empty: AtomicBool::new(true),
193
78.4k
        }
194
78.4k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
20.0k
    pub(crate) fn new() -> Self {
190
20.0k
        SyncWaker {
191
20.0k
            inner: Spinlock::new(Waker::new()),
192
20.0k
            is_empty: AtomicBool::new(true),
193
20.0k
        }
194
20.0k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
4.06k
    pub(crate) fn new() -> Self {
190
4.06k
        SyncWaker {
191
4.06k
            inner: Spinlock::new(Waker::new()),
192
4.06k
            is_empty: AtomicBool::new(true),
193
4.06k
        }
194
4.06k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
537
    pub(crate) fn new() -> Self {
190
537
        SyncWaker {
191
537
            inner: Spinlock::new(Waker::new()),
192
537
            is_empty: AtomicBool::new(true),
193
537
        }
194
537
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
22.2k
    pub(crate) fn new() -> Self {
190
22.2k
        SyncWaker {
191
22.2k
            inner: Spinlock::new(Waker::new()),
192
22.2k
            is_empty: AtomicBool::new(true),
193
22.2k
        }
194
22.2k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
8
    pub(crate) fn new() -> Self {
190
8
        SyncWaker {
191
8
            inner: Spinlock::new(Waker::new()),
192
8
            is_empty: AtomicBool::new(true),
193
8
        }
194
8
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
1
    pub(crate) fn new() -> Self {
190
1
        SyncWaker {
191
1
            inner: Spinlock::new(Waker::new()),
192
1
            is_empty: AtomicBool::new(true),
193
1
        }
194
1
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
7
    pub(crate) fn new() -> Self {
190
7
        SyncWaker {
191
7
            inner: Spinlock::new(Waker::new()),
192
7
            is_empty: AtomicBool::new(true),
193
7
        }
194
7
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
2
    pub(crate) fn new() -> Self {
190
2
        SyncWaker {
191
2
            inner: Spinlock::new(Waker::new()),
192
2
            is_empty: AtomicBool::new(true),
193
2
        }
194
2
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
11.1k
    pub(crate) fn new() -> Self {
190
11.1k
        SyncWaker {
191
11.1k
            inner: Spinlock::new(Waker::new()),
192
11.1k
            is_empty: AtomicBool::new(true),
193
11.1k
        }
194
11.1k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
10.3k
    pub(crate) fn new() -> Self {
190
10.3k
        SyncWaker {
191
10.3k
            inner: Spinlock::new(Waker::new()),
192
10.3k
            is_empty: AtomicBool::new(true),
193
10.3k
        }
194
10.3k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
4.07k
    pub(crate) fn new() -> Self {
190
4.07k
        SyncWaker {
191
4.07k
            inner: Spinlock::new(Waker::new()),
192
4.07k
            is_empty: AtomicBool::new(true),
193
4.07k
        }
194
4.07k
    }
<crossbeam_channel::waker::SyncWaker>::new
Line
Count
Source
189
6.04k
    pub(crate) fn new() -> Self {
190
6.04k
        SyncWaker {
191
6.04k
            inner: Spinlock::new(Waker::new()),
192
6.04k
            is_empty: AtomicBool::new(true),
193
6.04k
        }
194
6.04k
    }
195
196
    /// Registers the current thread with an operation.
197
    #[inline]
198
80.1k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
80.1k
        let mut inner = self.inner.lock();
200
80.1k
        inner.register(oper, cx);
201
80.1k
        self.is_empty.store(
202
80.1k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
80.1k
            Ordering::SeqCst,
204
80.1k
        );
205
80.1k
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
486
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
486
        let mut inner = self.inner.lock();
200
486
        inner.register(oper, cx);
201
486
        self.is_empty.store(
202
486
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
486
            Ordering::SeqCst,
204
486
        );
205
486
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
2.19k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
2.19k
        let mut inner = self.inner.lock();
200
2.19k
        inner.register(oper, cx);
201
2.19k
        self.is_empty.store(
202
2.19k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
2.19k
            Ordering::SeqCst,
204
2.19k
        );
205
2.19k
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
12.1k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
12.1k
        let mut inner = self.inner.lock();
200
12.1k
        inner.register(oper, cx);
201
12.1k
        self.is_empty.store(
202
12.1k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
12.1k
            Ordering::SeqCst,
204
12.1k
        );
205
12.1k
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
698
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
698
        let mut inner = self.inner.lock();
200
698
        inner.register(oper, cx);
201
698
        self.is_empty.store(
202
698
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
698
            Ordering::SeqCst,
204
698
        );
205
698
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
2
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
2
        let mut inner = self.inner.lock();
200
2
        inner.register(oper, cx);
201
2
        self.is_empty.store(
202
2
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
2
            Ordering::SeqCst,
204
2
        );
205
2
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::register
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
1.83k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
1.83k
        let mut inner = self.inner.lock();
200
1.83k
        inner.register(oper, cx);
201
1.83k
        self.is_empty.store(
202
1.83k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
1.83k
            Ordering::SeqCst,
204
1.83k
        );
205
1.83k
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
50.7k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
50.7k
        let mut inner = self.inner.lock();
200
50.7k
        inner.register(oper, cx);
201
50.7k
        self.is_empty.store(
202
50.7k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
50.7k
            Ordering::SeqCst,
204
50.7k
        );
205
50.7k
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
12.0k
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
12.0k
        let mut inner = self.inner.lock();
200
12.0k
        inner.register(oper, cx);
201
12.0k
        self.is_empty.store(
202
12.0k
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
12.0k
            Ordering::SeqCst,
204
12.0k
        );
205
12.0k
    }
<crossbeam_channel::waker::SyncWaker>::register
Line
Count
Source
198
1
    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199
1
        let mut inner = self.inner.lock();
200
1
        inner.register(oper, cx);
201
1
        self.is_empty.store(
202
1
            inner.selectors.is_empty() && 
inner.observers.is_empty()0
,
203
1
            Ordering::SeqCst,
204
1
        );
205
1
    }
206
207
    /// Unregisters an operation previously registered by the current thread.
208
    #[inline]
209
48.3k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
48.3k
        let mut inner = self.inner.lock();
211
48.3k
        let entry = inner.unregister(oper);
212
48.3k
        self.is_empty.store(
213
48.3k
            inner.selectors.is_empty() && 
inner.observers.is_empty()41.9k
,
214
48.3k
            Ordering::SeqCst,
215
48.3k
        );
216
48.3k
        entry
217
48.3k
    }
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
407
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
407
        let mut inner = self.inner.lock();
211
407
        let entry = inner.unregister(oper);
212
407
        self.is_empty.store(
213
407
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
407
            Ordering::SeqCst,
215
407
        );
216
407
        entry
217
407
    }
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
2.19k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
2.19k
        let mut inner = self.inner.lock();
211
2.19k
        let entry = inner.unregister(oper);
212
2.19k
        self.is_empty.store(
213
2.19k
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
2.19k
            Ordering::SeqCst,
215
2.19k
        );
216
2.19k
        entry
217
2.19k
    }
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
12.0k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
12.0k
        let mut inner = self.inner.lock();
211
12.0k
        let entry = inner.unregister(oper);
212
12.0k
        self.is_empty.store(
213
12.0k
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
12.0k
            Ordering::SeqCst,
215
12.0k
        );
216
12.0k
        entry
217
12.0k
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
577
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
577
        let mut inner = self.inner.lock();
211
577
        let entry = inner.unregister(oper);
212
577
        self.is_empty.store(
213
577
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
577
            Ordering::SeqCst,
215
577
        );
216
577
        entry
217
577
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
1.54k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
1.54k
        let mut inner = self.inner.lock();
211
1.54k
        let entry = inner.unregister(oper);
212
1.54k
        self.is_empty.store(
213
1.54k
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
1.54k
            Ordering::SeqCst,
215
1.54k
        );
216
1.54k
        entry
217
1.54k
    }
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
19.5k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
19.5k
        let mut inner = self.inner.lock();
211
19.5k
        let entry = inner.unregister(oper);
212
19.5k
        self.is_empty.store(
213
19.5k
            inner.selectors.is_empty() && 
inner.observers.is_empty()13.1k
,
214
19.5k
            Ordering::SeqCst,
215
19.5k
        );
216
19.5k
        entry
217
19.5k
    }
<crossbeam_channel::waker::SyncWaker>::unregister
Line
Count
Source
209
12.0k
    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210
12.0k
        let mut inner = self.inner.lock();
211
12.0k
        let entry = inner.unregister(oper);
212
12.0k
        self.is_empty.store(
213
12.0k
            inner.selectors.is_empty() && inner.observers.is_empty(),
214
12.0k
            Ordering::SeqCst,
215
12.0k
        );
216
12.0k
        entry
217
12.0k
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unregister
218
219
    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
220
    #[inline]
221
7.37M
    pub(crate) fn notify(&self) {
222
7.37M
        if !self.is_empty.load(Ordering::SeqCst) {
223
51.4k
            let mut inner = self.inner.lock();
224
51.4k
            if !self.is_empty.load(Ordering::SeqCst) {
225
48.5k
                inner.try_select();
226
48.5k
                inner.notify();
227
48.5k
                self.is_empty.store(
228
48.5k
                    inner.selectors.is_empty() && 
inner.observers.is_empty()20.2k
,
229
48.5k
                    Ordering::SeqCst,
230
                );
231
2.91k
            }
232
7.31M
        }
233
7.37M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
20.0k
    pub(crate) fn notify(&self) {
222
20.0k
        if !self.is_empty.load(Ordering::SeqCst) {
223
104
            let mut inner = self.inner.lock();
224
104
            if !self.is_empty.load(Ordering::SeqCst) {
225
104
                inner.try_select();
226
104
                inner.notify();
227
104
                self.is_empty.store(
228
104
                    inner.selectors.is_empty() && 
inner.observers.is_empty()79
,
229
104
                    Ordering::SeqCst,
230
                );
231
0
            }
232
19.8k
        }
233
20.0k
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1.34M
    pub(crate) fn notify(&self) {
222
1.34M
        if !self.is_empty.load(Ordering::SeqCst) {
223
679
            let mut inner = self.inner.lock();
224
679
            if !self.is_empty.load(Ordering::SeqCst) {
225
670
                inner.try_select();
226
670
                inner.notify();
227
670
                self.is_empty.store(
228
670
                    inner.selectors.is_empty() && 
inner.observers.is_empty()607
,
229
670
                    Ordering::SeqCst,
230
                );
231
9
            }
232
1.34M
        }
233
1.34M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
141k
    pub(crate) fn notify(&self) {
222
141k
        if !self.is_empty.load(Ordering::SeqCst) {
223
2.49k
            let mut inner = self.inner.lock();
224
2.49k
            if !self.is_empty.load(Ordering::SeqCst) {
225
2.47k
                inner.try_select();
226
2.47k
                inner.notify();
227
2.47k
                self.is_empty.store(
228
2.47k
                    inner.selectors.is_empty() && 
inner.observers.is_empty()1.93k
,
229
2.47k
                    Ordering::SeqCst,
230
                );
231
17
            }
232
139k
        }
233
141k
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1.77M
    pub(crate) fn notify(&self) {
222
1.77M
        if !self.is_empty.load(Ordering::SeqCst) {
223
296
            let mut inner = self.inner.lock();
224
296
            if !self.is_empty.load(Ordering::SeqCst) {
225
280
                inner.try_select();
226
280
                inner.notify();
227
280
                self.is_empty.store(
228
280
                    inner.selectors.is_empty() && 
inner.observers.is_empty()121
,
229
280
                    Ordering::SeqCst,
230
                );
231
16
            }
232
1.77M
        }
233
1.77M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
63
    pub(crate) fn notify(&self) {
222
63
        if !self.is_empty.load(Ordering::SeqCst) {
223
2
            let mut inner = self.inner.lock();
224
2
            if !self.is_empty.load(Ordering::SeqCst) {
225
2
                inner.try_select();
226
2
                inner.notify();
227
2
                self.is_empty.store(
228
2
                    inner.selectors.is_empty() && inner.observers.is_empty(),
229
2
                    Ordering::SeqCst,
230
                );
231
0
            }
232
61
        }
233
63
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
2
    pub(crate) fn notify(&self) {
222
2
        if !self.is_empty.load(Ordering::SeqCst) {
223
0
            let mut inner = self.inner.lock();
224
0
            if !self.is_empty.load(Ordering::SeqCst) {
225
0
                inner.try_select();
226
0
                inner.notify();
227
0
                self.is_empty.store(
228
0
                    inner.selectors.is_empty() && inner.observers.is_empty(),
229
0
                    Ordering::SeqCst,
230
                );
231
0
            }
232
2
        }
233
2
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::notify
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1
    pub(crate) fn notify(&self) {
222
1
        if !self.is_empty.load(Ordering::SeqCst) {
223
0
            let mut inner = self.inner.lock();
224
0
            if !self.is_empty.load(Ordering::SeqCst) {
225
0
                inner.try_select();
226
0
                inner.notify();
227
0
                self.is_empty.store(
228
0
                    inner.selectors.is_empty() && inner.observers.is_empty(),
229
0
                    Ordering::SeqCst,
230
                );
231
0
            }
232
1
        }
233
1
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1.00M
    pub(crate) fn notify(&self) {
222
1.00M
        if !self.is_empty.load(Ordering::SeqCst) {
223
642
            let mut inner = self.inner.lock();
224
642
            if !self.is_empty.load(Ordering::SeqCst) {
225
590
                inner.try_select();
226
590
                inner.notify();
227
590
                self.is_empty.store(
228
590
                    inner.selectors.is_empty() && 
inner.observers.is_empty()291
,
229
589
                    Ordering::SeqCst,
230
                );
231
52
            }
232
1.00M
        }
233
1.00M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1.51M
    pub(crate) fn notify(&self) {
222
1.51M
        if !self.is_empty.load(Ordering::SeqCst) {
223
46.0k
            let mut inner = self.inner.lock();
224
46.0k
            if !self.is_empty.load(Ordering::SeqCst) {
225
43.2k
                inner.try_select();
226
43.2k
                inner.notify();
227
43.2k
                self.is_empty.store(
228
43.2k
                    inner.selectors.is_empty() && 
inner.observers.is_empty()16.4k
,
229
43.2k
                    Ordering::SeqCst,
230
                );
231
2.82k
            }
232
1.46M
        }
233
1.51M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
1.35M
    pub(crate) fn notify(&self) {
222
1.35M
        if !self.is_empty.load(Ordering::SeqCst) {
223
1.15k
            let mut inner = self.inner.lock();
224
1.15k
            if !self.is_empty.load(Ordering::SeqCst) {
225
1.15k
                inner.try_select();
226
1.15k
                inner.notify();
227
1.15k
                self.is_empty.store(
228
1.15k
                    inner.selectors.is_empty() && 
inner.observers.is_empty()825
,
229
1.15k
                    Ordering::SeqCst,
230
                );
231
1
            }
232
1.35M
        }
233
1.35M
    }
<crossbeam_channel::waker::SyncWaker>::notify
Line
Count
Source
221
219k
    pub(crate) fn notify(&self) {
222
219k
        if !self.is_empty.load(Ordering::SeqCst) {
223
5
            let mut inner = self.inner.lock();
224
5
            if !self.is_empty.load(Ordering::SeqCst) {
225
5
                inner.try_select();
226
5
                inner.notify();
227
5
                self.is_empty.store(
228
5
                    inner.selectors.is_empty() && inner.observers.is_empty(),
229
5
                    Ordering::SeqCst,
230
                );
231
0
            }
232
219k
        }
233
219k
    }
234
235
    /// Registers an operation waiting to be ready.
236
    #[inline]
237
17
    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238
17
        let mut inner = self.inner.lock();
239
17
        inner.watch(oper, cx);
240
17
        self.is_empty.store(
241
17
            inner.selectors.is_empty() && inner.observers.is_empty(),
242
17
            Ordering::SeqCst,
243
17
        );
244
17
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::watch
<crossbeam_channel::waker::SyncWaker>::watch
Line
Count
Source
237
17
    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238
17
        let mut inner = self.inner.lock();
239
17
        inner.watch(oper, cx);
240
17
        self.is_empty.store(
241
17
            inner.selectors.is_empty() && inner.observers.is_empty(),
242
17
            Ordering::SeqCst,
243
17
        );
244
17
    }
245
246
    /// Unregisters an operation waiting to be ready.
247
    #[inline]
248
17
    pub(crate) fn unwatch(&self, oper: Operation) {
249
17
        let mut inner = self.inner.lock();
250
17
        inner.unwatch(oper);
251
17
        self.is_empty.store(
252
17
            inner.selectors.is_empty() && inner.observers.is_empty(),
253
17
            Ordering::SeqCst,
254
17
        );
255
17
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::unwatch
<crossbeam_channel::waker::SyncWaker>::unwatch
Line
Count
Source
248
17
    pub(crate) fn unwatch(&self, oper: Operation) {
249
17
        let mut inner = self.inner.lock();
250
17
        inner.unwatch(oper);
251
17
        self.is_empty.store(
252
17
            inner.selectors.is_empty() && inner.observers.is_empty(),
253
17
            Ordering::SeqCst,
254
17
        );
255
17
    }
256
257
    /// Notifies all threads that the channel is disconnected.
258
    #[inline]
259
78.3k
    pub(crate) fn disconnect(&self) {
260
78.3k
        let mut inner = self.inner.lock();
261
78.3k
        inner.disconnect();
262
78.3k
        self.is_empty.store(
263
78.3k
            inner.selectors.is_empty() && 
inner.observers.is_empty()78.0k
,
264
78.3k
            Ordering::SeqCst,
265
78.3k
        );
266
78.3k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
20.0k
    pub(crate) fn disconnect(&self) {
260
20.0k
        let mut inner = self.inner.lock();
261
20.0k
        inner.disconnect();
262
20.0k
        self.is_empty.store(
263
20.0k
            inner.selectors.is_empty() && inner.observers.is_empty(),
264
20.0k
            Ordering::SeqCst,
265
20.0k
        );
266
20.0k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
4.06k
    pub(crate) fn disconnect(&self) {
260
4.06k
        let mut inner = self.inner.lock();
261
4.06k
        inner.disconnect();
262
4.06k
        self.is_empty.store(
263
4.06k
            inner.selectors.is_empty() && 
inner.observers.is_empty()3.73k
,
264
4.06k
            Ordering::SeqCst,
265
4.06k
        );
266
4.06k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
538
    pub(crate) fn disconnect(&self) {
260
538
        let mut inner = self.inner.lock();
261
538
        inner.disconnect();
262
538
        self.is_empty.store(
263
538
            inner.selectors.is_empty() && 
inner.observers.is_empty()537
,
264
538
            Ordering::SeqCst,
265
538
        );
266
538
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
22.2k
    pub(crate) fn disconnect(&self) {
260
22.2k
        let mut inner = self.inner.lock();
261
22.2k
        inner.disconnect();
262
22.2k
        self.is_empty.store(
263
22.2k
            inner.selectors.is_empty() && 
inner.observers.is_empty()22.2k
,
264
22.2k
            Ordering::SeqCst,
265
22.2k
        );
266
22.2k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
8
    pub(crate) fn disconnect(&self) {
260
8
        let mut inner = self.inner.lock();
261
8
        inner.disconnect();
262
8
        self.is_empty.store(
263
8
            inner.selectors.is_empty() && 
inner.observers.is_empty()7
,
264
8
            Ordering::SeqCst,
265
8
        );
266
8
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
1
    pub(crate) fn disconnect(&self) {
260
1
        let mut inner = self.inner.lock();
261
1
        inner.disconnect();
262
1
        self.is_empty.store(
263
1
            inner.selectors.is_empty() && inner.observers.is_empty(),
264
1
            Ordering::SeqCst,
265
1
        );
266
1
    }
Unexecuted instantiation: <crossbeam_channel::waker::SyncWaker>::disconnect
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
7
    pub(crate) fn disconnect(&self) {
260
7
        let mut inner = self.inner.lock();
261
7
        inner.disconnect();
262
7
        self.is_empty.store(
263
7
            inner.selectors.is_empty() && inner.observers.is_empty(),
264
7
            Ordering::SeqCst,
265
7
        );
266
7
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
2
    pub(crate) fn disconnect(&self) {
260
2
        let mut inner = self.inner.lock();
261
2
        inner.disconnect();
262
2
        self.is_empty.store(
263
2
            inner.selectors.is_empty() && inner.observers.is_empty(),
264
2
            Ordering::SeqCst,
265
2
        );
266
2
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
11.1k
    pub(crate) fn disconnect(&self) {
260
11.1k
        let mut inner = self.inner.lock();
261
11.1k
        inner.disconnect();
262
11.1k
        self.is_empty.store(
263
11.1k
            inner.selectors.is_empty() && 
inner.observers.is_empty()11.1k
,
264
11.1k
            Ordering::SeqCst,
265
11.1k
        );
266
11.1k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
10.2k
    pub(crate) fn disconnect(&self) {
260
10.2k
        let mut inner = self.inner.lock();
261
10.2k
        inner.disconnect();
262
10.2k
        self.is_empty.store(
263
10.2k
            inner.selectors.is_empty() && 
inner.observers.is_empty()10.2k
,
264
10.2k
            Ordering::SeqCst,
265
10.2k
        );
266
10.2k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
4.07k
    pub(crate) fn disconnect(&self) {
260
4.07k
        let mut inner = self.inner.lock();
261
4.07k
        inner.disconnect();
262
4.07k
        self.is_empty.store(
263
4.07k
            inner.selectors.is_empty() && 
inner.observers.is_empty()4.07k
,
264
4.07k
            Ordering::SeqCst,
265
4.07k
        );
266
4.07k
    }
<crossbeam_channel::waker::SyncWaker>::disconnect
Line
Count
Source
259
6.04k
    pub(crate) fn disconnect(&self) {
260
6.04k
        let mut inner = self.inner.lock();
261
6.04k
        inner.disconnect();
262
6.04k
        self.is_empty.store(
263
6.04k
            
inner.selectors.is_empty()6.04k
&& inner.observers.is_empty(),
264
6.04k
            Ordering::SeqCst,
265
6.04k
        );
266
6.04k
    }
267
}
268
269
impl Drop for SyncWaker {
270
    #[inline]
271
78.3k
    fn drop(&mut self) {
272
78.3k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
78.3k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
20.0k
    fn drop(&mut self) {
272
20.0k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
20.0k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
4.06k
    fn drop(&mut self) {
272
4.06k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
4.06k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
538
    fn drop(&mut self) {
272
538
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
538
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
22.2k
    fn drop(&mut self) {
272
22.2k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
22.2k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
8
    fn drop(&mut self) {
272
8
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
8
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
1
    fn drop(&mut self) {
272
1
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
1
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
7
    fn drop(&mut self) {
272
7
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
7
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
2
    fn drop(&mut self) {
272
2
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
2
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
11.1k
    fn drop(&mut self) {
272
11.1k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
11.1k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
10.2k
    fn drop(&mut self) {
272
10.2k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
10.2k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
4.07k
    fn drop(&mut self) {
272
4.07k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
4.07k
    }
<crossbeam_channel::waker::SyncWaker as core::ops::drop::Drop>::drop
Line
Count
Source
271
6.04k
    fn drop(&mut self) {
272
6.04k
        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273
6.04k
    }
274
}
275
276
/// Returns the id of the current thread.
277
#[inline]
278
2.40M
fn current_thread_id() -> ThreadId {
279
2.40M
    thread_local! {
280
2.40M
        /// Cached thread-local id.
281
2.40M
        static THREAD_ID: ThreadId = thread::current().id();
282
2.40M
    }
283
2.40M
284
2.40M
    THREAD_ID
285
2.41M
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
408k
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
862k
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
37.9k
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
280
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
2
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
589
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
131k
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
840k
        .try_with(|id| *id)
crossbeam_channel::waker::current_thread_id::{closure#0}
Line
Count
Source
285
130k
        .try_with(|id| *id)
286
2.40M
        .unwrap_or_else(|_| thread::current().id())
287
2.40M
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
404k
fn current_thread_id() -> ThreadId {
279
404k
    thread_local! {
280
404k
        /// Cached thread-local id.
281
404k
        static THREAD_ID: ThreadId = thread::current().id();
282
404k
    }
283
404k
284
404k
    THREAD_ID
285
404k
        .try_with(|id| *id)
286
404k
        .unwrap_or_else(|_| thread::current().id())
287
404k
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
861k
fn current_thread_id() -> ThreadId {
279
861k
    thread_local! {
280
861k
        /// Cached thread-local id.
281
861k
        static THREAD_ID: ThreadId = thread::current().id();
282
861k
    }
283
861k
284
861k
    THREAD_ID
285
861k
        .try_with(|id| *id)
286
861k
        .unwrap_or_else(|_| thread::current().id())
287
861k
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
37.7k
fn current_thread_id() -> ThreadId {
279
37.7k
    thread_local! {
280
37.7k
        /// Cached thread-local id.
281
37.7k
        static THREAD_ID: ThreadId = thread::current().id();
282
37.7k
    }
283
37.7k
284
37.7k
    THREAD_ID
285
37.7k
        .try_with(|id| *id)
286
37.7k
        .unwrap_or_else(|_| thread::current().id())
287
37.7k
}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
280
fn current_thread_id() -> ThreadId {
279
280
    thread_local! {
280
280
        /// Cached thread-local id.
281
280
        static THREAD_ID: ThreadId = thread::current().id();
282
280
    }
283
280
284
280
    THREAD_ID
285
280
        .try_with(|id| *id)
286
280
        .unwrap_or_else(|_| thread::current().id())
287
280
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
2
fn current_thread_id() -> ThreadId {
279
2
    thread_local! {
280
2
        /// Cached thread-local id.
281
2
        static THREAD_ID: ThreadId = thread::current().id();
282
2
    }
283
2
284
2
    THREAD_ID
285
2
        .try_with(|id| *id)
286
2
        .unwrap_or_else(|_| thread::current().id())
287
2
}
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
Unexecuted instantiation: crossbeam_channel::waker::current_thread_id
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
590
fn current_thread_id() -> ThreadId {
279
590
    thread_local! {
280
590
        /// Cached thread-local id.
281
590
        static THREAD_ID: ThreadId = thread::current().id();
282
590
    }
283
590
284
590
    THREAD_ID
285
590
        .try_with(|id| *id)
286
590
        .unwrap_or_else(|_| thread::current().id())
287
590
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
131k
fn current_thread_id() -> ThreadId {
279
131k
    thread_local! {
280
131k
        /// Cached thread-local id.
281
131k
        static THREAD_ID: ThreadId = thread::current().id();
282
131k
    }
283
131k
284
131k
    THREAD_ID
285
131k
        .try_with(|id| *id)
286
131k
        .unwrap_or_else(|_| thread::current().id())
287
131k
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
840k
fn current_thread_id() -> ThreadId {
279
840k
    thread_local! {
280
840k
        /// Cached thread-local id.
281
840k
        static THREAD_ID: ThreadId = thread::current().id();
282
840k
    }
283
840k
284
840k
    THREAD_ID
285
840k
        .try_with(|id| *id)
286
840k
        .unwrap_or_else(|_| thread::current().id())
287
840k
}
crossbeam_channel::waker::current_thread_id
Line
Count
Source
278
129k
fn current_thread_id() -> ThreadId {
279
129k
    thread_local! {
280
129k
        /// Cached thread-local id.
281
129k
        static THREAD_ID: ThreadId = thread::current().id();
282
129k
    }
283
129k
284
129k
    THREAD_ID
285
129k
        .try_with(|id| *id)
286
129k
        .unwrap_or_else(|_| thread::current().id())
287
129k
}