Coverage Report

Created: 2021-01-22 16:54

crossbeam-epoch/src/internal.rs
Line
Count
Source (jump to first uncovered line)
1
//! The global data and participant for garbage collection.
2
//!
3
//! # Registration
4
//!
5
//! In order to track all participants in one place, we need some form of participant
6
//! registration. When a participant is created, it is registered to a global lock-free
7
//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8
//! list.
9
//!
10
//! # Pinning
11
//!
12
//! Every participant contains an integer that tells whether the participant is pinned and if so,
13
//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14
//! aids in periodic global epoch advancement.
15
//!
16
//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17
//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18
//!
19
//! # Thread-local bag
20
//!
21
//! Objects that get unlinked from concurrent data structures must be stashed away until the global
22
//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23
//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24
//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25
//! for amortizing the synchronization cost of pushing the garbages to a global queue.
26
//!
27
//! # Global queue
28
//!
29
//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30
//! destroyed along the way. This design reduces contention on data structures. The global queue
31
//! cannot be explicitly accessed: the only way to interact with it is by calling functions
32
//! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33
//! garbage collection.
34
//!
35
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
36
//! destroyed as soon as the data structure gets dropped.
37
38
use crate::primitive::cell::UnsafeCell;
39
use crate::primitive::sync::atomic;
40
use core::cell::Cell;
41
use core::mem::{self, ManuallyDrop};
42
use core::num::Wrapping;
43
use core::sync::atomic::Ordering;
44
use core::{fmt, ptr};
45
46
use crossbeam_utils::CachePadded;
47
use memoffset::offset_of;
48
49
use crate::atomic::{Owned, Shared};
50
use crate::collector::{Collector, LocalHandle};
51
use crate::deferred::Deferred;
52
use crate::epoch::{AtomicEpoch, Epoch};
53
use crate::guard::{unprotected, Guard};
54
use crate::sync::list::{Entry, IsElement, IterError, List};
55
use crate::sync::queue::Queue;
56
57
/// Maximum number of objects a bag can contain.
58
#[cfg(not(crossbeam_sanitize))]
59
const MAX_OBJECTS: usize = 62;
60
#[cfg(crossbeam_sanitize)]
61
const MAX_OBJECTS: usize = 4;
62
63
/// A bag of deferred functions.
64
pub(crate) struct Bag {
65
    /// Stashed objects.
66
    deferreds: [Deferred; MAX_OBJECTS],
67
    len: usize,
68
}
69
70
/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
71
unsafe impl Send for Bag {}
72
73
impl Bag {
74
    /// Returns a new, empty bag.
75
65.3k
    pub(crate) fn new() -> Self {
76
65.3k
        Self::default()
77
65.3k
    }
<crossbeam_epoch::internal::Bag>::new
Line
Count
Source
75
364
    pub(crate) fn new() -> Self {
76
364
        Self::default()
77
364
    }
<crossbeam_epoch::internal::Bag>::new
Line
Count
Source
75
64.9k
    pub(crate) fn new() -> Self {
76
64.9k
        Self::default()
77
64.9k
    }
78
79
    /// Returns `true` if the bag is empty.
80
743
    pub(crate) fn is_empty(&self) -> bool {
81
743
        self.len == 0
82
743
    }
<crossbeam_epoch::internal::Bag>::is_empty
Line
Count
Source
80
76
    pub(crate) fn is_empty(&self) -> bool {
81
76
        self.len == 0
82
76
    }
<crossbeam_epoch::internal::Bag>::is_empty
Line
Count
Source
80
667
    pub(crate) fn is_empty(&self) -> bool {
81
667
        self.len == 0
82
667
    }
83
84
    /// Attempts to insert a deferred function into the bag.
85
    ///
86
    /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
87
    /// full.
88
    ///
89
    /// # Safety
90
    ///
91
    /// It should be safe for another thread to execute the given function.
92
3.79M
    pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
93
3.79M
        if self.len < MAX_OBJECTS {
94
3.71M
            self.deferreds[self.len] = deferred;
95
3.71M
            self.len += 1;
96
3.71M
            Ok(())
97
        } else {
98
78.7k
            Err(deferred)
99
        }
100
3.79M
    }
<crossbeam_epoch::internal::Bag>::try_push
Line
Count
Source
92
343
    pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
93
343
        if self.len < MAX_OBJECTS {
94
343
            self.deferreds[self.len] = deferred;
95
343
            self.len += 1;
96
343
            Ok(())
97
        } else {
98
0
            Err(deferred)
99
        }
100
343
    }
<crossbeam_epoch::internal::Bag>::try_push
Line
Count
Source
92
3.79M
    pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
93
3.79M
        if self.len < MAX_OBJECTS {
94
3.71M
            self.deferreds[self.len] = deferred;
95
3.71M
            self.len += 1;
96
3.71M
            Ok(())
97
        } else {
98
78.7k
            Err(deferred)
99
        }
100
3.79M
    }
101
102
    /// Seals the bag with the given epoch.
103
65.1k
    fn seal(self, epoch: Epoch) -> SealedBag {
104
65.1k
        SealedBag { epoch, bag: self }
105
65.1k
    }
<crossbeam_epoch::internal::Bag>::seal
Line
Count
Source
103
219
    fn seal(self, epoch: Epoch) -> SealedBag {
104
219
        SealedBag { epoch, bag: self }
105
219
    }
<crossbeam_epoch::internal::Bag>::seal
Line
Count
Source
103
64.9k
    fn seal(self, epoch: Epoch) -> SealedBag {
104
64.9k
        SealedBag { epoch, bag: self }
105
64.9k
    }
106
}
107
108
impl Default for Bag {
109
    #[rustfmt::skip]
110
65.3k
    fn default() -> Self {
111
65.3k
        // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112
65.3k
        #[cfg(not(crossbeam_sanitize))]
113
65.3k
        return Bag {
114
65.3k
            len: 0,
115
65.3k
            deferreds: [
116
65.3k
                Deferred::new(no_op_func),
117
65.3k
                Deferred::new(no_op_func),
118
65.3k
                Deferred::new(no_op_func),
119
65.3k
                Deferred::new(no_op_func),
120
65.3k
                Deferred::new(no_op_func),
121
65.3k
                Deferred::new(no_op_func),
122
65.3k
                Deferred::new(no_op_func),
123
65.3k
                Deferred::new(no_op_func),
124
65.3k
                Deferred::new(no_op_func),
125
65.3k
                Deferred::new(no_op_func),
126
65.3k
                Deferred::new(no_op_func),
127
65.3k
                Deferred::new(no_op_func),
128
65.3k
                Deferred::new(no_op_func),
129
65.3k
                Deferred::new(no_op_func),
130
65.3k
                Deferred::new(no_op_func),
131
65.3k
                Deferred::new(no_op_func),
132
65.3k
                Deferred::new(no_op_func),
133
65.3k
                Deferred::new(no_op_func),
134
65.3k
                Deferred::new(no_op_func),
135
65.3k
                Deferred::new(no_op_func),
136
65.3k
                Deferred::new(no_op_func),
137
65.3k
                Deferred::new(no_op_func),
138
65.3k
                Deferred::new(no_op_func),
139
65.3k
                Deferred::new(no_op_func),
140
65.3k
                Deferred::new(no_op_func),
141
65.3k
                Deferred::new(no_op_func),
142
65.3k
                Deferred::new(no_op_func),
143
65.3k
                Deferred::new(no_op_func),
144
65.3k
                Deferred::new(no_op_func),
145
65.3k
                Deferred::new(no_op_func),
146
65.3k
                Deferred::new(no_op_func),
147
65.3k
                Deferred::new(no_op_func),
148
65.3k
                Deferred::new(no_op_func),
149
65.3k
                Deferred::new(no_op_func),
150
65.3k
                Deferred::new(no_op_func),
151
65.3k
                Deferred::new(no_op_func),
152
65.3k
                Deferred::new(no_op_func),
153
65.3k
                Deferred::new(no_op_func),
154
65.3k
                Deferred::new(no_op_func),
155
65.3k
                Deferred::new(no_op_func),
156
65.3k
                Deferred::new(no_op_func),
157
65.3k
                Deferred::new(no_op_func),
158
65.3k
                Deferred::new(no_op_func),
159
65.3k
                Deferred::new(no_op_func),
160
65.3k
                Deferred::new(no_op_func),
161
65.3k
                Deferred::new(no_op_func),
162
65.3k
                Deferred::new(no_op_func),
163
65.3k
                Deferred::new(no_op_func),
164
65.3k
                Deferred::new(no_op_func),
165
65.3k
                Deferred::new(no_op_func),
166
65.3k
                Deferred::new(no_op_func),
167
65.3k
                Deferred::new(no_op_func),
168
65.3k
                Deferred::new(no_op_func),
169
65.3k
                Deferred::new(no_op_func),
170
65.3k
                Deferred::new(no_op_func),
171
65.3k
                Deferred::new(no_op_func),
172
65.3k
                Deferred::new(no_op_func),
173
65.3k
                Deferred::new(no_op_func),
174
65.3k
                Deferred::new(no_op_func),
175
65.3k
                Deferred::new(no_op_func),
176
65.3k
                Deferred::new(no_op_func),
177
65.3k
                Deferred::new(no_op_func),
178
65.3k
            ],
179
65.3k
        };
180
65.3k
        #[cfg(crossbeam_sanitize)]
181
65.3k
        return Bag {
182
65.3k
            len: 0,
183
65.3k
            deferreds: [
184
65.3k
                Deferred::new(no_op_func),
185
65.3k
                Deferred::new(no_op_func),
186
65.3k
                Deferred::new(no_op_func),
187
65.3k
                Deferred::new(no_op_func),
188
65.3k
            ],
189
65.3k
        };
190
65.3k
    }
<crossbeam_epoch::internal::Bag as core::default::Default>::default
Line
Count
Source
110
363
    fn default() -> Self {
111
363
        // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112
363
        #[cfg(not(crossbeam_sanitize))]
113
363
        return Bag {
114
363
            len: 0,
115
363
            deferreds: [
116
363
                Deferred::new(no_op_func),
117
363
                Deferred::new(no_op_func),
118
363
                Deferred::new(no_op_func),
119
363
                Deferred::new(no_op_func),
120
363
                Deferred::new(no_op_func),
121
363
                Deferred::new(no_op_func),
122
363
                Deferred::new(no_op_func),
123
363
                Deferred::new(no_op_func),
124
363
                Deferred::new(no_op_func),
125
363
                Deferred::new(no_op_func),
126
363
                Deferred::new(no_op_func),
127
363
                Deferred::new(no_op_func),
128
363
                Deferred::new(no_op_func),
129
363
                Deferred::new(no_op_func),
130
363
                Deferred::new(no_op_func),
131
363
                Deferred::new(no_op_func),
132
363
                Deferred::new(no_op_func),
133
363
                Deferred::new(no_op_func),
134
363
                Deferred::new(no_op_func),
135
363
                Deferred::new(no_op_func),
136
363
                Deferred::new(no_op_func),
137
363
                Deferred::new(no_op_func),
138
363
                Deferred::new(no_op_func),
139
363
                Deferred::new(no_op_func),
140
363
                Deferred::new(no_op_func),
141
363
                Deferred::new(no_op_func),
142
363
                Deferred::new(no_op_func),
143
363
                Deferred::new(no_op_func),
144
363
                Deferred::new(no_op_func),
145
363
                Deferred::new(no_op_func),
146
363
                Deferred::new(no_op_func),
147
363
                Deferred::new(no_op_func),
148
363
                Deferred::new(no_op_func),
149
363
                Deferred::new(no_op_func),
150
363
                Deferred::new(no_op_func),
151
363
                Deferred::new(no_op_func),
152
363
                Deferred::new(no_op_func),
153
363
                Deferred::new(no_op_func),
154
363
                Deferred::new(no_op_func),
155
363
                Deferred::new(no_op_func),
156
363
                Deferred::new(no_op_func),
157
363
                Deferred::new(no_op_func),
158
363
                Deferred::new(no_op_func),
159
363
                Deferred::new(no_op_func),
160
363
                Deferred::new(no_op_func),
161
363
                Deferred::new(no_op_func),
162
363
                Deferred::new(no_op_func),
163
363
                Deferred::new(no_op_func),
164
363
                Deferred::new(no_op_func),
165
363
                Deferred::new(no_op_func),
166
363
                Deferred::new(no_op_func),
167
363
                Deferred::new(no_op_func),
168
363
                Deferred::new(no_op_func),
169
363
                Deferred::new(no_op_func),
170
363
                Deferred::new(no_op_func),
171
363
                Deferred::new(no_op_func),
172
363
                Deferred::new(no_op_func),
173
363
                Deferred::new(no_op_func),
174
363
                Deferred::new(no_op_func),
175
363
                Deferred::new(no_op_func),
176
363
                Deferred::new(no_op_func),
177
363
                Deferred::new(no_op_func),
178
363
            ],
179
363
        };
180
363
        #[cfg(crossbeam_sanitize)]
181
363
        return Bag {
182
363
            len: 0,
183
363
            deferreds: [
184
363
                Deferred::new(no_op_func),
185
363
                Deferred::new(no_op_func),
186
363
                Deferred::new(no_op_func),
187
363
                Deferred::new(no_op_func),
188
363
            ],
189
363
        };
190
363
    }
<crossbeam_epoch::internal::Bag as core::default::Default>::default
Line
Count
Source
110
64.9k
    fn default() -> Self {
111
64.9k
        // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112
64.9k
        #[cfg(not(crossbeam_sanitize))]
113
64.9k
        return Bag {
114
64.9k
            len: 0,
115
64.9k
            deferreds: [
116
64.9k
                Deferred::new(no_op_func),
117
64.9k
                Deferred::new(no_op_func),
118
64.9k
                Deferred::new(no_op_func),
119
64.9k
                Deferred::new(no_op_func),
120
64.9k
                Deferred::new(no_op_func),
121
64.9k
                Deferred::new(no_op_func),
122
64.9k
                Deferred::new(no_op_func),
123
64.9k
                Deferred::new(no_op_func),
124
64.9k
                Deferred::new(no_op_func),
125
64.9k
                Deferred::new(no_op_func),
126
64.9k
                Deferred::new(no_op_func),
127
64.9k
                Deferred::new(no_op_func),
128
64.9k
                Deferred::new(no_op_func),
129
64.9k
                Deferred::new(no_op_func),
130
64.9k
                Deferred::new(no_op_func),
131
64.9k
                Deferred::new(no_op_func),
132
64.9k
                Deferred::new(no_op_func),
133
64.9k
                Deferred::new(no_op_func),
134
64.9k
                Deferred::new(no_op_func),
135
64.9k
                Deferred::new(no_op_func),
136
64.9k
                Deferred::new(no_op_func),
137
64.9k
                Deferred::new(no_op_func),
138
64.9k
                Deferred::new(no_op_func),
139
64.9k
                Deferred::new(no_op_func),
140
64.9k
                Deferred::new(no_op_func),
141
64.9k
                Deferred::new(no_op_func),
142
64.9k
                Deferred::new(no_op_func),
143
64.9k
                Deferred::new(no_op_func),
144
64.9k
                Deferred::new(no_op_func),
145
64.9k
                Deferred::new(no_op_func),
146
64.9k
                Deferred::new(no_op_func),
147
64.9k
                Deferred::new(no_op_func),
148
64.9k
                Deferred::new(no_op_func),
149
64.9k
                Deferred::new(no_op_func),
150
64.9k
                Deferred::new(no_op_func),
151
64.9k
                Deferred::new(no_op_func),
152
64.9k
                Deferred::new(no_op_func),
153
64.9k
                Deferred::new(no_op_func),
154
64.9k
                Deferred::new(no_op_func),
155
64.9k
                Deferred::new(no_op_func),
156
64.9k
                Deferred::new(no_op_func),
157
64.9k
                Deferred::new(no_op_func),
158
64.9k
                Deferred::new(no_op_func),
159
64.9k
                Deferred::new(no_op_func),
160
64.9k
                Deferred::new(no_op_func),
161
64.9k
                Deferred::new(no_op_func),
162
64.9k
                Deferred::new(no_op_func),
163
64.9k
                Deferred::new(no_op_func),
164
64.9k
                Deferred::new(no_op_func),
165
64.9k
                Deferred::new(no_op_func),
166
64.9k
                Deferred::new(no_op_func),
167
64.9k
                Deferred::new(no_op_func),
168
64.9k
                Deferred::new(no_op_func),
169
64.9k
                Deferred::new(no_op_func),
170
64.9k
                Deferred::new(no_op_func),
171
64.9k
                Deferred::new(no_op_func),
172
64.9k
                Deferred::new(no_op_func),
173
64.9k
                Deferred::new(no_op_func),
174
64.9k
                Deferred::new(no_op_func),
175
64.9k
                Deferred::new(no_op_func),
176
64.9k
                Deferred::new(no_op_func),
177
64.9k
                Deferred::new(no_op_func),
178
64.9k
            ],
179
64.9k
        };
180
64.9k
        #[cfg(crossbeam_sanitize)]
181
64.9k
        return Bag {
182
64.9k
            len: 0,
183
64.9k
            deferreds: [
184
64.9k
                Deferred::new(no_op_func),
185
64.9k
                Deferred::new(no_op_func),
186
64.9k
                Deferred::new(no_op_func),
187
64.9k
                Deferred::new(no_op_func),
188
64.9k
            ],
189
64.9k
        };
190
64.9k
    }
191
}
192
193
impl Drop for Bag {
194
65.1k
    fn drop(&mut self) {
195
        // Call all deferred functions.
196
3.70M
        for deferred in &mut 
self.deferreds[..self.len]65.1k
{
197
3.70M
            let no_op = Deferred::new(no_op_func);
198
3.70M
            let owned_deferred = mem::replace(deferred, no_op);
199
3.70M
            owned_deferred.call();
200
3.70M
        }
201
65.2k
    }
<crossbeam_epoch::internal::Bag as core::ops::drop::Drop>::drop
Line
Count
Source
194
237
    fn drop(&mut self) {
195
        // Call all deferred functions.
196
270
        for deferred in &mut 
self.deferreds[..self.len]237
{
197
270
            let no_op = Deferred::new(no_op_func);
198
270
            let owned_deferred = mem::replace(deferred, no_op);
199
270
            owned_deferred.call();
200
270
        }
201
236
    }
<crossbeam_epoch::internal::Bag as core::ops::drop::Drop>::drop
Line
Count
Source
194
64.9k
    fn drop(&mut self) {
195
        // Call all deferred functions.
196
3.70M
        for deferred in &mut 
self.deferreds[..self.len]64.9k
{
197
3.70M
            let no_op = Deferred::new(no_op_func);
198
3.70M
            let owned_deferred = mem::replace(deferred, no_op);
199
3.70M
            owned_deferred.call();
200
3.70M
        }
201
64.9k
    }
202
}
203
204
// can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
205
impl fmt::Debug for Bag {
206
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207
0
        f.debug_struct("Bag")
208
0
            .field("deferreds", &&self.deferreds[..self.len])
209
0
            .finish()
210
0
    }
211
}
212
213
0
fn no_op_func() {}
214
215
/// A pair of an epoch and a bag.
216
0
#[derive(Default, Debug)]
217
struct SealedBag {
218
    epoch: Epoch,
219
    bag: Bag,
220
}
221
222
/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
223
unsafe impl Sync for SealedBag {}
224
225
impl SealedBag {
226
    /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
227
162k
    fn is_expired(&self, global_epoch: Epoch) -> bool {
228
162k
        // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
229
162k
        // is within one epoch of the current one cannot be destroyed yet.
230
162k
        global_epoch.wrapping_sub(self.epoch) >= 2
231
162k
    }
<crossbeam_epoch::internal::SealedBag>::is_expired
Line
Count
Source
227
6.14k
    fn is_expired(&self, global_epoch: Epoch) -> bool {
228
6.14k
        // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
229
6.14k
        // is within one epoch of the current one cannot be destroyed yet.
230
6.14k
        global_epoch.wrapping_sub(self.epoch) >= 2
231
6.14k
    }
<crossbeam_epoch::internal::SealedBag>::is_expired
Line
Count
Source
227
156k
    fn is_expired(&self, global_epoch: Epoch) -> bool {
228
156k
        // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
229
156k
        // is within one epoch of the current one cannot be destroyed yet.
230
156k
        global_epoch.wrapping_sub(self.epoch) >= 2
231
156k
    }
232
}
233
234
/// The global data for a garbage collector.
235
pub(crate) struct Global {
236
    /// The intrusive linked list of `Local`s.
237
    locals: List<Local>,
238
239
    /// The global queue of bags of deferred functions.
240
    queue: Queue<SealedBag>,
241
242
    /// The global epoch.
243
    pub(crate) epoch: CachePadded<AtomicEpoch>,
244
}
245
246
impl Global {
247
    /// Number of bags to destroy.
248
    const COLLECT_STEPS: usize = 8;
249
250
    /// Creates a new global data for garbage collection.
251
    #[inline]
252
58
    pub(crate) fn new() -> Self {
253
58
        Self {
254
58
            locals: List::new(),
255
58
            queue: Queue::new(),
256
58
            epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
257
58
        }
258
58
    }
<crossbeam_epoch::internal::Global>::new
Line
Count
Source
252
43
    pub(crate) fn new() -> Self {
253
43
        Self {
254
43
            locals: List::new(),
255
43
            queue: Queue::new(),
256
43
            epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
257
43
        }
258
43
    }
<crossbeam_epoch::internal::Global>::new
Line
Count
Source
252
15
    pub(crate) fn new() -> Self {
253
15
        Self {
254
15
            locals: List::new(),
255
15
            queue: Queue::new(),
256
15
            epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
257
15
        }
258
15
    }
259
260
    /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
261
65.1k
    pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
262
65.1k
        let bag = mem::replace(bag, Bag::new());
263
65.1k
264
65.1k
        atomic::fence(Ordering::SeqCst);
265
65.1k
266
65.1k
        let epoch = self.epoch.load(Ordering::Relaxed);
267
65.1k
        self.queue.push(bag.seal(epoch), guard);
268
65.1k
    }
<crossbeam_epoch::internal::Global>::push_bag
Line
Count
Source
261
219
    pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
262
219
        let bag = mem::replace(bag, Bag::new());
263
219
264
219
        atomic::fence(Ordering::SeqCst);
265
219
266
219
        let epoch = self.epoch.load(Ordering::Relaxed);
267
219
        self.queue.push(bag.seal(epoch), guard);
268
219
    }
<crossbeam_epoch::internal::Global>::push_bag
Line
Count
Source
261
64.9k
    pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
262
64.9k
        let bag = mem::replace(bag, Bag::new());
263
64.9k
264
64.9k
        atomic::fence(Ordering::SeqCst);
265
64.9k
266
64.9k
        let epoch = self.epoch.load(Ordering::Relaxed);
267
64.9k
        self.queue.push(bag.seal(epoch), guard);
268
64.9k
    }
269
270
    /// Collects several bags from the global queue and executes deferred functions in them.
271
    ///
272
    /// Note: This may itself produce garbage and in turn allocate new bags.
273
    ///
274
    /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
275
    /// path. In other words, we want the compiler to optimize branching for the case when
276
    /// `collect()` is not called.
277
    #[cold]
278
3.67M
    pub(crate) fn collect(&self, guard: &Guard) {
279
3.67M
        let global_epoch = self.try_advance(guard);
280
281
3.67M
        let steps = if cfg!(crossbeam_sanitize) {
282
            usize::max_value()
283
        } else {
284
3.67M
            Self::COLLECT_STEPS
285
        };
286
287
3.79M
        for _ in 0..
steps3.67M
{
288
3.79M
            match self.queue.try_pop_if(
289
3.79M
                &|sealed_bag: &SealedBag| 
sealed_bag.is_expired(global_epoch)162k
,
<crossbeam_epoch::internal::Global>::collect::{closure#0}
Line
Count
Source
289
6.14k
                &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
<crossbeam_epoch::internal::Global>::collect::{closure#0}
Line
Count
Source
289
156k
                &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
290
3.79M
                guard,
291
3.79M
            ) {
292
3.79M
                None => 
break3.67M
,
293
122k
                Some(sealed_bag) => drop(sealed_bag),
294
            }
295
        }
296
3.68M
    }
<crossbeam_epoch::internal::Global>::collect
Line
Count
Source
278
9.71k
    pub(crate) fn collect(&self, guard: &Guard) {
279
9.71k
        let global_epoch = self.try_advance(guard);
280
281
9.71k
        let steps = if cfg!(crossbeam_sanitize) {
282
            usize::max_value()
283
        } else {
284
9.71k
            Self::COLLECT_STEPS
285
        };
286
287
9.87k
        for _ in 0..
steps9.71k
{
288
9.87k
            match self.queue.try_pop_if(
289
9.87k
                &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
290
9.87k
                guard,
291
9.87k
            ) {
292
9.87k
                None => 
break9.68k
,
293
197
                Some(sealed_bag) => drop(sealed_bag),
294
            }
295
        }
296
9.68k
    }
<crossbeam_epoch::internal::Global>::collect
Line
Count
Source
278
3.66M
    pub(crate) fn collect(&self, guard: &Guard) {
279
3.66M
        let global_epoch = self.try_advance(guard);
280
281
3.66M
        let steps = if cfg!(crossbeam_sanitize) {
282
            usize::max_value()
283
        } else {
284
3.66M
            Self::COLLECT_STEPS
285
        };
286
287
3.78M
        for _ in 0..
steps3.66M
{
288
3.78M
            match self.queue.try_pop_if(
289
3.78M
                &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
290
3.78M
                guard,
291
3.78M
            ) {
292
3.78M
                None => 
break3.66M
,
293
122k
                Some(sealed_bag) => drop(sealed_bag),
294
            }
295
        }
296
3.67M
    }
297
298
    /// Attempts to advance the global epoch.
299
    ///
300
    /// The global epoch can advance only if all currently pinned participants have been pinned in
301
    /// the current epoch.
302
    ///
303
    /// Returns the current global epoch.
304
    ///
305
    /// `try_advance()` is annotated `#[cold]` because it is rarely called.
306
    #[cold]
307
3.70M
    pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
308
3.70M
        let global_epoch = self.epoch.load(Ordering::Relaxed);
309
3.70M
        atomic::fence(Ordering::SeqCst);
310
311
        // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
312
        // easy to implement in a lock-free manner. However, traversal can be slow due to cache
313
        // misses and data dependencies. We should experiment with other data structures as well.
314
10.7M
        for local in 
self.locals.iter(&guard3.70M
) {
315
10.7M
            match local {
316
10.7M
                Err(IterError::Stalled) => {
317
                    // A concurrent thread stalled this iteration. That thread might also try to
318
                    // advance the epoch, in which case we leave the job to it. Otherwise, the
319
                    // epoch will not be advanced.
320
0
                    return global_epoch;
321
                }
322
10.7M
                Ok(local) => {
323
10.7M
                    let local_epoch = local.epoch.load(Ordering::Relaxed);
324
325
                    // If the participant was pinned in a different epoch, we cannot advance the
326
                    // global epoch just yet.
327
10.7M
                    if local_epoch.is_pinned() && 
local_epoch.unpinned() != global_epoch9.82M
{
328
3.37M
                        return global_epoch;
329
7.57M
                    }
330
                }
331
            }
332
        }
333
249k
        atomic::fence(Ordering::Acquire);
334
249k
335
249k
        // All pinned participants were pinned in the current global epoch.
336
249k
        // Now let's advance the global epoch...
337
249k
        //
338
249k
        // Note that if another thread already advanced it before us, this store will simply
339
249k
        // overwrite the global epoch with the same value. This is true because `try_advance` was
340
249k
        // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
341
249k
        // advanced two steps ahead of it.
342
249k
        let new_epoch = global_epoch.successor();
343
249k
        self.epoch.store(new_epoch, Ordering::Release);
344
249k
        new_epoch
345
3.62M
    }
<crossbeam_epoch::internal::Global>::try_advance
Line
Count
Source
307
9.71k
    pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
308
9.71k
        let global_epoch = self.epoch.load(Ordering::Relaxed);
309
9.71k
        atomic::fence(Ordering::SeqCst);
310
311
        // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
312
        // easy to implement in a lock-free manner. However, traversal can be slow due to cache
313
        // misses and data dependencies. We should experiment with other data structures as well.
314
86.1k
        for local in 
self.locals.iter(&guard9.71k
) {
315
86.1k
            match local {
316
86.1k
                Err(IterError::Stalled) => {
317
                    // A concurrent thread stalled this iteration. That thread might also try to
318
                    // advance the epoch, in which case we leave the job to it. Otherwise, the
319
                    // epoch will not be advanced.
320
0
                    return global_epoch;
321
                }
322
86.1k
                Ok(local) => {
323
86.1k
                    let local_epoch = local.epoch.load(Ordering::Relaxed);
324
325
                    // If the participant was pinned in a different epoch, we cannot advance the
326
                    // global epoch just yet.
327
86.1k
                    if local_epoch.is_pinned() && 
local_epoch.unpinned() != global_epoch37.5k
{
328
7.16k
                        return global_epoch;
329
78.8k
                    }
330
                }
331
            }
332
        }
333
2.55k
        atomic::fence(Ordering::Acquire);
334
2.55k
335
2.55k
        // All pinned participants were pinned in the current global epoch.
336
2.55k
        // Now let's advance the global epoch...
337
2.55k
        //
338
2.55k
        // Note that if another thread already advanced it before us, this store will simply
339
2.55k
        // overwrite the global epoch with the same value. This is true because `try_advance` was
340
2.55k
        // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
341
2.55k
        // advanced two steps ahead of it.
342
2.55k
        let new_epoch = global_epoch.successor();
343
2.55k
        self.epoch.store(new_epoch, Ordering::Release);
344
2.55k
        new_epoch
345
9.71k
    }
<crossbeam_epoch::internal::Global>::try_advance
Line
Count
Source
307
3.69M
    pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
308
3.69M
        let global_epoch = self.epoch.load(Ordering::Relaxed);
309
3.69M
        atomic::fence(Ordering::SeqCst);
310
311
        // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
312
        // easy to implement in a lock-free manner. However, traversal can be slow due to cache
313
        // misses and data dependencies. We should experiment with other data structures as well.
314
10.6M
        for local in 
self.locals.iter(&guard3.69M
) {
315
10.6M
            match local {
316
10.6M
                Err(IterError::Stalled) => {
317
                    // A concurrent thread stalled this iteration. That thread might also try to
318
                    // advance the epoch, in which case we leave the job to it. Otherwise, the
319
                    // epoch will not be advanced.
320
0
                    return global_epoch;
321
                }
322
10.6M
                Ok(local) => {
323
10.6M
                    let local_epoch = local.epoch.load(Ordering::Relaxed);
324
325
                    // If the participant was pinned in a different epoch, we cannot advance the
326
                    // global epoch just yet.
327
10.6M
                    if local_epoch.is_pinned() && 
local_epoch.unpinned() != global_epoch9.78M
{
328
3.36M
                        return global_epoch;
329
7.49M
                    }
330
                }
331
            }
332
        }
333
246k
        atomic::fence(Ordering::Acquire);
334
246k
335
246k
        // All pinned participants were pinned in the current global epoch.
336
246k
        // Now let's advance the global epoch...
337
246k
        //
338
246k
        // Note that if another thread already advanced it before us, this store will simply
339
246k
        // overwrite the global epoch with the same value. This is true because `try_advance` was
340
246k
        // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
341
246k
        // advanced two steps ahead of it.
342
246k
        let new_epoch = global_epoch.successor();
343
246k
        self.epoch.store(new_epoch, Ordering::Release);
344
246k
        new_epoch
345
3.61M
    }
346
}
347
348
/// Participant for garbage collection.
349
pub(crate) struct Local {
350
    /// A node in the intrusive linked list of `Local`s.
351
    entry: Entry,
352
353
    /// The local epoch.
354
    epoch: AtomicEpoch,
355
356
    /// A reference to the global data.
357
    ///
358
    /// When all guards and handles get dropped, this reference is destroyed.
359
    collector: UnsafeCell<ManuallyDrop<Collector>>,
360
361
    /// The local bag of deferred functions.
362
    pub(crate) bag: UnsafeCell<Bag>,
363
364
    /// The number of guards keeping this participant pinned.
365
    guard_count: Cell<usize>,
366
367
    /// The number of active handles.
368
    handle_count: Cell<usize>,
369
370
    /// Total number of pinnings performed.
371
    ///
372
    /// This is just an auxiliary counter that sometimes kicks off collection.
373
    pin_count: Cell<Wrapping<usize>>,
374
}
375
376
// Make sure `Local` is less than or equal to 2048 bytes.
377
// https://github.com/crossbeam-rs/crossbeam/issues/551
378
#[test]
379
1
fn local_size() {
crossbeam_epoch::internal::local_size::{closure#0}
Line
Count
Source
379
1
fn local_size() {
380
    assert!(
381
1
        core::mem::size_of::<Local>() <= 2048,
382
        "An allocation of `Local` should be <= 2048 bytes."
383
    );
384
1
}
crossbeam_epoch::internal::local_size
Line
Count
Source
379
1
fn local_size() {
380
    assert!(
381
1
        core::mem::size_of::<Local>() <= 2048,
382
        "An allocation of `Local` should be <= 2048 bytes."
383
    );
384
1
}
385
386
impl Local {
387
    /// Number of pinnings after which a participant will execute some deferred functions from the
388
    /// global queue.
389
    const PINNINGS_BETWEEN_COLLECT: usize = 128;
390
391
    /// Registers a new `Local` in the provided `Global`.
392
216
    pub(crate) fn register(collector: &Collector) -> LocalHandle {
393
216
        unsafe {
394
216
            // Since we dereference no pointers in this block, it is safe to use `unprotected`.
395
216
396
216
            let local = Owned::new(Local {
397
216
                entry: Entry::default(),
398
216
                epoch: AtomicEpoch::new(Epoch::starting()),
399
216
                collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
400
216
                bag: UnsafeCell::new(Bag::new()),
401
216
                guard_count: Cell::new(0),
402
216
                handle_count: Cell::new(1),
403
216
                pin_count: Cell::new(Wrapping(0)),
404
216
            })
405
216
            .into_shared(unprotected());
406
216
            collector.global.locals.insert(local, unprotected());
407
216
            LocalHandle {
408
216
                local: local.as_raw(),
409
216
            }
410
216
        }
411
216
    }
<crossbeam_epoch::internal::Local>::register
Line
Count
Source
392
146
    pub(crate) fn register(collector: &Collector) -> LocalHandle {
393
146
        unsafe {
394
146
            // Since we dereference no pointers in this block, it is safe to use `unprotected`.
395
146
396
146
            let local = Owned::new(Local {
397
146
                entry: Entry::default(),
398
146
                epoch: AtomicEpoch::new(Epoch::starting()),
399
146
                collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
400
146
                bag: UnsafeCell::new(Bag::new()),
401
146
                guard_count: Cell::new(0),
402
146
                handle_count: Cell::new(1),
403
146
                pin_count: Cell::new(Wrapping(0)),
404
146
            })
405
146
            .into_shared(unprotected());
406
146
            collector.global.locals.insert(local, unprotected());
407
146
            LocalHandle {
408
146
                local: local.as_raw(),
409
146
            }
410
146
        }
411
146
    }
<crossbeam_epoch::internal::Local>::register
Line
Count
Source
392
70
    pub(crate) fn register(collector: &Collector) -> LocalHandle {
393
70
        unsafe {
394
70
            // Since we dereference no pointers in this block, it is safe to use `unprotected`.
395
70
396
70
            let local = Owned::new(Local {
397
70
                entry: Entry::default(),
398
70
                epoch: AtomicEpoch::new(Epoch::starting()),
399
70
                collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
400
70
                bag: UnsafeCell::new(Bag::new()),
401
70
                guard_count: Cell::new(0),
402
70
                handle_count: Cell::new(1),
403
70
                pin_count: Cell::new(Wrapping(0)),
404
70
            })
405
70
            .into_shared(unprotected());
406
70
            collector.global.locals.insert(local, unprotected());
407
70
            LocalHandle {
408
70
                local: local.as_raw(),
409
70
            }
410
70
        }
411
70
    }
412
413
    /// Returns a reference to the `Global` in which this `Local` resides.
414
    #[inline]
415
14.0M
    pub(crate) fn global(&self) -> &Global {
416
14.0M
        &self.collector().global
417
14.0M
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
210
    pub(crate) fn global(&self) -> &Global {
416
210
        &self.collector().global
417
210
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
441
    pub(crate) fn global(&self) -> &Global {
416
441
        &self.collector().global
417
441
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
4
    pub(crate) fn global(&self) -> &Global {
416
4
        &self.collector().global
417
4
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
40
    pub(crate) fn global(&self) -> &Global {
416
40
        &self.collector().global
417
40
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
2
    pub(crate) fn global(&self) -> &Global {
416
2
        &self.collector().global
417
2
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
452k
    pub(crate) fn global(&self) -> &Global {
416
452k
        &self.collector().global
417
452k
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
24
    pub(crate) fn global(&self) -> &Global {
416
24
        &self.collector().global
417
24
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
600k
    pub(crate) fn global(&self) -> &Global {
416
600k
        &self.collector().global
417
600k
    }
<crossbeam_epoch::internal::Local>::global
Line
Count
Source
415
12.9M
    pub(crate) fn global(&self) -> &Global {
416
12.9M
        &self.collector().global
417
12.9M
    }
418
419
    /// Returns a reference to the `Collector` in which this `Local` resides.
420
    #[inline]
421
14.1M
    pub(crate) fn collector(&self) -> &Collector {
422
14.1M
        self.collector.with(|c| 
unsafe { &**c }13.5M
)
<crossbeam_epoch::internal::Local>::collector::{closure#0}
Line
Count
Source
422
1.01M
        self.collector.with(|c| unsafe { &**c })
<crossbeam_epoch::internal::Local>::collector::{closure#0}
Line
Count
Source
422
12.5M
        self.collector.with(|c| unsafe { &**c })
423
14.1M
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
210
    pub(crate) fn collector(&self) -> &Collector {
422
210
        self.collector.with(|c| unsafe { &**c })
423
210
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
1.13k
    pub(crate) fn collector(&self) -> &Collector {
422
1.13k
        self.collector.with(|c| unsafe { &**c })
423
1.13k
    }
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::collector
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
4
    pub(crate) fn collector(&self) -> &Collector {
422
4
        self.collector.with(|c| unsafe { &**c })
423
4
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
40
    pub(crate) fn collector(&self) -> &Collector {
422
40
        self.collector.with(|c| unsafe { &**c })
423
40
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
2
    pub(crate) fn collector(&self) -> &Collector {
422
2
        self.collector.with(|c| unsafe { &**c })
423
2
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
449k
    pub(crate) fn collector(&self) -> &Collector {
422
449k
        self.collector.with(|c| unsafe { &**c })
423
449k
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
24
    pub(crate) fn collector(&self) -> &Collector {
422
24
        self.collector.with(|c| unsafe { &**c })
423
24
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
524k
    pub(crate) fn collector(&self) -> &Collector {
422
524k
        self.collector.with(|c| unsafe { &**c })
423
524k
    }
<crossbeam_epoch::internal::Local>::collector
Line
Count
Source
421
13.2M
    pub(crate) fn collector(&self) -> &Collector {
422
13.2M
        self.collector.with(|c| unsafe { &**c })
423
13.2M
    }
424
425
    /// Returns `true` if the current participant is pinned.
426
    #[inline]
427
1.01M
    pub(crate) fn is_pinned(&self) -> bool {
428
1.01M
        self.guard_count.get() > 0
429
1.01M
    }
<crossbeam_epoch::internal::Local>::is_pinned
Line
Count
Source
427
438k
    pub(crate) fn is_pinned(&self) -> bool {
428
438k
        self.guard_count.get() > 0
429
438k
    }
<crossbeam_epoch::internal::Local>::is_pinned
Line
Count
Source
427
14
    pub(crate) fn is_pinned(&self) -> bool {
428
14
        self.guard_count.get() > 0
429
14
    }
<crossbeam_epoch::internal::Local>::is_pinned
Line
Count
Source
427
572k
    pub(crate) fn is_pinned(&self) -> bool {
428
572k
        self.guard_count.get() > 0
429
572k
    }
<crossbeam_epoch::internal::Local>::is_pinned
Line
Count
Source
427
5
    pub(crate) fn is_pinned(&self) -> bool {
428
5
        self.guard_count.get() > 0
429
5
    }
430
431
    /// Adds `deferred` to the thread-local bag.
432
    ///
433
    /// # Safety
434
    ///
435
    /// It should be safe for another thread to execute the given function.
436
3.67M
    pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
437
3.76M
        let bag = self.bag.with_mut(|b| &mut *b
)3.67M
;
<crossbeam_epoch::internal::Local>::defer::{closure#0}
Line
Count
Source
437
343
        let bag = self.bag.with_mut(|b| &mut *b);
<crossbeam_epoch::internal::Local>::defer::{closure#0}
Line
Count
Source
437
3.76M
        let bag = self.bag.with_mut(|b| &mut *b);
438
439
3.74M
        while let Err(
d64.6k
) = bag.try_push(deferred) {
440
64.6k
            self.global().push_bag(bag, guard);
441
64.6k
            deferred = d;
442
64.6k
        }
443
3.67M
    }
<crossbeam_epoch::internal::Local>::defer
Line
Count
Source
436
343
    pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
437
343
        let bag = self.bag.with_mut(|b| &mut *b);
438
439
343
        while let Err(
d0
) = bag.try_push(deferred) {
440
0
            self.global().push_bag(bag, guard);
441
0
            deferred = d;
442
0
        }
443
343
    }
<crossbeam_epoch::internal::Local>::defer
Line
Count
Source
436
3.67M
    pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
437
3.67M
        let bag = self.bag.with_mut(|b| &mut *b);
438
439
3.74M
        while let Err(
d64.6k
) = bag.try_push(deferred) {
440
64.6k
            self.global().push_bag(bag, guard);
441
64.6k
            deferred = d;
442
64.6k
        }
443
3.67M
    }
444
445
280
    pub(crate) fn flush(&self, guard: &Guard) {
446
280
        let bag = self.bag.with_mut(|b| unsafe { &mut *b });
<crossbeam_epoch::internal::Local>::flush::{closure#0}
Line
Count
Source
446
76
        let bag = self.bag.with_mut(|b| unsafe { &mut *b });
<crossbeam_epoch::internal::Local>::flush::{closure#0}
Line
Count
Source
446
204
        let bag = self.bag.with_mut(|b| unsafe { &mut *b });
447
280
448
280
        if !bag.is_empty() {
449
278
            self.global().push_bag(bag, guard);
450
278
        }
2
451
452
280
        self.global().collect(guard);
453
280
    }
454
455
    /// Pins the `Local`.
456
    #[inline]
457
13.5M
    pub(crate) fn pin(&self) -> Guard {
458
13.5M
        let guard = Guard { local: self };
459
13.5M
460
13.5M
        let guard_count = self.guard_count.get();
461
13.5M
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
13.5M
463
13.5M
        if guard_count == 0 {
464
13.5M
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
13.5M
            let new_epoch = global_epoch.pinned();
466
13.5M
467
13.5M
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
13.5M
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
13.5M
            // this store.
470
13.5M
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
12.6M
                let current = Epoch::starting();
485
12.6M
                let res = self.epoch.compare_exchange(
486
12.6M
                    current,
487
12.6M
                    new_epoch,
488
12.6M
                    Ordering::SeqCst,
489
12.6M
                    Ordering::SeqCst,
490
12.6M
                );
491
14.2M
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
12.4M
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
12.4M
            let count = self.pin_count.get();
503
12.4M
            self.pin_count.set(count + Wrapping(1));
504
12.4M
505
12.4M
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
12.4M
            // some garbage.
507
12.4M
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
126k
                self.global().collect(&guard);
509
13.8M
            }
510
32.2k
        }
511
512
13.9M
        guard
513
13.9M
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
206
    pub(crate) fn pin(&self) -> Guard {
458
206
        let guard = Guard { local: self };
459
206
460
206
        let guard_count = self.guard_count.get();
461
206
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
206
463
206
        if guard_count == 0 {
464
206
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
206
            let new_epoch = global_epoch.pinned();
466
206
467
206
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
206
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
206
            // this store.
470
206
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
206
                let current = Epoch::starting();
485
206
                let res = self.epoch.compare_exchange(
486
206
                    current,
487
206
                    new_epoch,
488
206
                    Ordering::SeqCst,
489
206
                    Ordering::SeqCst,
490
206
                );
491
206
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
206
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
206
            let count = self.pin_count.get();
503
206
            self.pin_count.set(count + Wrapping(1));
504
206
505
206
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
206
            // some garbage.
507
206
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
4
                self.global().collect(&guard);
509
202
            }
510
0
        }
511
512
206
        guard
513
206
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
145
    pub(crate) fn pin(&self) -> Guard {
458
145
        let guard = Guard { local: self };
459
145
460
145
        let guard_count = self.guard_count.get();
461
145
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
145
463
145
        if guard_count == 0 {
464
145
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
145
            let new_epoch = global_epoch.pinned();
466
145
467
145
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
145
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
145
            // this store.
470
145
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
145
                let current = Epoch::starting();
485
145
                let res = self.epoch.compare_exchange(
486
145
                    current,
487
145
                    new_epoch,
488
145
                    Ordering::SeqCst,
489
145
                    Ordering::SeqCst,
490
145
                );
491
145
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
146
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
146
            let count = self.pin_count.get();
503
146
            self.pin_count.set(count + Wrapping(1));
504
146
505
146
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
146
            // some garbage.
507
146
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
0
                self.global().collect(&guard);
509
146
            }
510
0
        }
511
512
146
        guard
513
146
    }
Unexecuted instantiation: <crossbeam_epoch::internal::Local>::pin
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
3
    pub(crate) fn pin(&self) -> Guard {
458
3
        let guard = Guard { local: self };
459
3
460
3
        let guard_count = self.guard_count.get();
461
3
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
3
463
3
        if guard_count == 0 {
464
3
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
3
            let new_epoch = global_epoch.pinned();
466
3
467
3
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
3
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
3
            // this store.
470
3
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
3
                let current = Epoch::starting();
485
3
                let res = self.epoch.compare_exchange(
486
3
                    current,
487
3
                    new_epoch,
488
3
                    Ordering::SeqCst,
489
3
                    Ordering::SeqCst,
490
3
                );
491
3
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
3
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
3
            let count = self.pin_count.get();
503
3
            self.pin_count.set(count + Wrapping(1));
504
3
505
3
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
3
            // some garbage.
507
3
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
1
                self.global().collect(&guard);
509
2
            }
510
0
        }
511
512
3
        guard
513
3
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
21
    pub(crate) fn pin(&self) -> Guard {
458
21
        let guard = Guard { local: self };
459
21
460
21
        let guard_count = self.guard_count.get();
461
21
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
21
463
21
        if guard_count == 0 {
464
21
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
21
            let new_epoch = global_epoch.pinned();
466
21
467
21
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
21
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
21
            // this store.
470
21
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
21
                let current = Epoch::starting();
485
21
                let res = self.epoch.compare_exchange(
486
21
                    current,
487
21
                    new_epoch,
488
21
                    Ordering::SeqCst,
489
21
                    Ordering::SeqCst,
490
21
                );
491
21
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
21
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
21
            let count = self.pin_count.get();
503
21
            self.pin_count.set(count + Wrapping(1));
504
21
505
21
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
21
            // some garbage.
507
21
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
19
                self.global().collect(&guard);
509
19
            }
2
510
0
        }
511
512
21
        guard
513
21
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
1
    pub(crate) fn pin(&self) -> Guard {
458
1
        let guard = Guard { local: self };
459
1
460
1
        let guard_count = self.guard_count.get();
461
1
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
1
463
1
        if guard_count == 0 {
464
1
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
1
            let new_epoch = global_epoch.pinned();
466
1
467
1
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
1
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
1
            // this store.
470
1
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
1
                let current = Epoch::starting();
485
1
                let res = self.epoch.compare_exchange(
486
1
                    current,
487
1
                    new_epoch,
488
1
                    Ordering::SeqCst,
489
1
                    Ordering::SeqCst,
490
1
                );
491
1
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
1
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
1
            let count = self.pin_count.get();
503
1
            self.pin_count.set(count + Wrapping(1));
504
1
505
1
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
1
            // some garbage.
507
1
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
1
                self.global().collect(&guard);
509
1
            }
0
510
0
        }
511
512
1
        guard
513
1
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
427k
    pub(crate) fn pin(&self) -> Guard {
458
427k
        let guard = Guard { local: self };
459
427k
460
427k
        let guard_count = self.guard_count.get();
461
427k
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
427k
463
427k
        if guard_count == 0 {
464
445k
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
445k
            let new_epoch = global_epoch.pinned();
466
445k
467
445k
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
445k
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
445k
            // this store.
470
445k
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
431k
                let current = Epoch::starting();
485
431k
                let res = self.epoch.compare_exchange(
486
431k
                    current,
487
431k
                    new_epoch,
488
431k
                    Ordering::SeqCst,
489
431k
                    Ordering::SeqCst,
490
431k
                );
491
462k
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
423k
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
423k
            let count = self.pin_count.get();
503
423k
            self.pin_count.set(count + Wrapping(1));
504
423k
505
423k
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
423k
            // some garbage.
507
423k
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
4.15k
                self.global().collect(&guard);
509
440k
            }
510
18.4E
        }
511
512
426k
        guard
513
426k
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
14
    pub(crate) fn pin(&self) -> Guard {
458
14
        let guard = Guard { local: self };
459
14
460
14
        let guard_count = self.guard_count.get();
461
14
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
14
463
14
        if guard_count == 0 {
464
14
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
14
            let new_epoch = global_epoch.pinned();
466
14
467
14
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
14
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
14
            // this store.
470
14
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
14
                let current = Epoch::starting();
485
14
                let res = self.epoch.compare_exchange(
486
14
                    current,
487
14
                    new_epoch,
488
14
                    Ordering::SeqCst,
489
14
                    Ordering::SeqCst,
490
14
                );
491
14
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
14
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
14
            let count = self.pin_count.get();
503
14
            self.pin_count.set(count + Wrapping(1));
504
14
505
14
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
14
            // some garbage.
507
14
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
10
                self.global().collect(&guard);
509
10
            }
4
510
0
        }
511
512
14
        guard
513
14
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
563k
    pub(crate) fn pin(&self) -> Guard {
458
563k
        let guard = Guard { local: self };
459
563k
460
563k
        let guard_count = self.guard_count.get();
461
563k
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
563k
463
563k
        if guard_count == 0 {
464
590k
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
590k
            let new_epoch = global_epoch.pinned();
466
590k
467
590k
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
590k
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
590k
            // this store.
470
590k
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
559k
                let current = Epoch::starting();
485
559k
                let res = self.epoch.compare_exchange(
486
559k
                    current,
487
559k
                    new_epoch,
488
559k
                    Ordering::SeqCst,
489
559k
                    Ordering::SeqCst,
490
559k
                );
491
624k
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
450k
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
450k
            let count = self.pin_count.get();
503
450k
            self.pin_count.set(count + Wrapping(1));
504
450k
505
450k
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
450k
            // some garbage.
507
450k
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
5.41k
                self.global().collect(&guard);
509
447k
            }
510
18.4E
        }
511
512
426k
        guard
513
426k
    }
<crossbeam_epoch::internal::Local>::pin
Line
Count
Source
457
12.5M
    pub(crate) fn pin(&self) -> Guard {
458
12.5M
        let guard = Guard { local: self };
459
12.5M
460
12.5M
        let guard_count = self.guard_count.get();
461
12.5M
        self.guard_count.set(guard_count.checked_add(1).unwrap());
462
12.5M
463
12.5M
        if guard_count == 0 {
464
12.4M
            let global_epoch = self.global().epoch.load(Ordering::Relaxed);
465
12.4M
            let new_epoch = global_epoch.pinned();
466
12.4M
467
12.4M
            // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
468
12.4M
            // The fence makes sure that any future loads from `Atomic`s will not happen before
469
12.4M
            // this store.
470
12.4M
            if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
471
                // HACK(stjepang): On x86 architectures there are two different ways of executing
472
                // a `SeqCst` fence.
473
                //
474
                // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
475
                // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
476
                //    instruction.
477
                //
478
                // Both instructions have the effect of a full barrier, but benchmarks have shown
479
                // that the second one makes pinning faster in this particular case.  It is not
480
                // clear that this is permitted by the C++ memory model (SC fences work very
481
                // differently from SC accesses), but experimental evidence suggests that this
482
                // works fine.  Using inline assembly would be a viable (and correct) alternative,
483
                // but alas, that is not possible on stable Rust.
484
11.6M
                let current = Epoch::starting();
485
11.6M
                let res = self.epoch.compare_exchange(
486
11.6M
                    current,
487
11.6M
                    new_epoch,
488
11.6M
                    Ordering::SeqCst,
489
11.6M
                    Ordering::SeqCst,
490
11.6M
                );
491
13.1M
                debug_assert!(res.is_ok(), "participant was expected to be unpinned");
492
                // We add a compiler fence to make it less likely for LLVM to do something wrong
493
                // here.  Formally, this is not enough to get rid of data races; practically,
494
                // it should go a long way.
495
11.5M
                atomic::compiler_fence(Ordering::SeqCst);
496
            } else {
497
                self.epoch.store(new_epoch, Ordering::Relaxed);
498
                atomic::fence(Ordering::SeqCst);
499
            }
500
501
            // Increment the pin counter.
502
11.5M
            let count = self.pin_count.get();
503
11.5M
            self.pin_count.set(count + Wrapping(1));
504
11.5M
505
11.5M
            // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
506
11.5M
            // some garbage.
507
11.5M
            if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
508
116k
                self.global().collect(&guard);
509
12.9M
            }
510
76.9k
        }
511
512
13.1M
        guard
513
13.1M
    }
514
515
    /// Unpins the `Local`.
516
    #[inline]
517
13.5M
    pub(crate) fn unpin(&self) {
518
13.5M
        let guard_count = self.guard_count.get();
519
13.5M
        self.guard_count.set(guard_count - 1);
520
13.5M
521
13.5M
        if guard_count == 1 {
522
13.5M
            self.epoch.store(Epoch::starting(), Ordering::Release);
523
13.5M
524
13.5M
            if self.handle_count.get() == 0 {
525
0
                self.finalize();
526
13.7M
            }
527
18.4E
        }
528
13.7M
    }
<crossbeam_epoch::internal::Local>::unpin
Line
Count
Source
517
1.01M
    pub(crate) fn unpin(&self) {
518
1.01M
        let guard_count = self.guard_count.get();
519
1.01M
        self.guard_count.set(guard_count - 1);
520
1.01M
521
1.01M
        if guard_count == 1 {
522
1.01M
            self.epoch.store(Epoch::starting(), Ordering::Release);
523
1.01M
524
1.01M
            if self.handle_count.get() == 0 {
525
0
                self.finalize();
526
1.00M
            }
527
18.4E
        }
528
997k
    }
<crossbeam_epoch::internal::Local>::unpin
Line
Count
Source
517
12.5M
    pub(crate) fn unpin(&self) {
518
12.5M
        let guard_count = self.guard_count.get();
519
12.5M
        self.guard_count.set(guard_count - 1);
520
12.5M
521
12.5M
        if guard_count == 1 {
522
12.5M
            self.epoch.store(Epoch::starting(), Ordering::Release);
523
12.5M
524
12.5M
            if self.handle_count.get() == 0 {
525
0
                self.finalize();
526
12.7M
            }
527
18.4E
        }
528
12.7M
    }
529
530
    /// Unpins and then pins the `Local`.
531
    #[inline]
532
3
    pub(crate) fn repin(&self) {
533
3
        let guard_count = self.guard_count.get();
534
3
535
3
        // Update the local epoch only if there's only one guard.
536
3
        if guard_count == 1 {
537
3
            let epoch = self.epoch.load(Ordering::Relaxed);
538
3
            let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
539
3
540
3
            // Update the local epoch only if the global epoch is greater than the local epoch.
541
3
            if epoch != global_epoch {
542
3
                // We store the new epoch with `Release` because we need to ensure any memory
543
3
                // accesses from the previous epoch do not leak into the new one.
544
3
                self.epoch.store(global_epoch, Ordering::Release);
545
3
546
3
                // However, we don't need a following `SeqCst` fence, because it is safe for memory
547
3
                // accesses from the new epoch to be executed before updating the local epoch. At
548
3
                // worse, other threads will see the new epoch late and delay GC slightly.
549
3
            }
0
550
0
        }
551
3
    }
<crossbeam_epoch::internal::Local>::repin
Line
Count
Source
532
1
    pub(crate) fn repin(&self) {
533
1
        let guard_count = self.guard_count.get();
534
1
535
1
        // Update the local epoch only if there's only one guard.
536
1
        if guard_count == 1 {
537
1
            let epoch = self.epoch.load(Ordering::Relaxed);
538
1
            let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
539
1
540
1
            // Update the local epoch only if the global epoch is greater than the local epoch.
541
1
            if epoch != global_epoch {
542
1
                // We store the new epoch with `Release` because we need to ensure any memory
543
1
                // accesses from the previous epoch do not leak into the new one.
544
1
                self.epoch.store(global_epoch, Ordering::Release);
545
1
546
1
                // However, we don't need a following `SeqCst` fence, because it is safe for memory
547
1
                // accesses from the new epoch to be executed before updating the local epoch. At
548
1
                // worse, other threads will see the new epoch late and delay GC slightly.
549
1
            }
0
550
0
        }
551
1
    }
<crossbeam_epoch::internal::Local>::repin
Line
Count
Source
532
2
    pub(crate) fn repin(&self) {
533
2
        let guard_count = self.guard_count.get();
534
2
535
2
        // Update the local epoch only if there's only one guard.
536
2
        if guard_count == 1 {
537
2
            let epoch = self.epoch.load(Ordering::Relaxed);
538
2
            let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
539
2
540
2
            // Update the local epoch only if the global epoch is greater than the local epoch.
541
2
            if epoch != global_epoch {
542
2
                // We store the new epoch with `Release` because we need to ensure any memory
543
2
                // accesses from the previous epoch do not leak into the new one.
544
2
                self.epoch.store(global_epoch, Ordering::Release);
545
2
546
2
                // However, we don't need a following `SeqCst` fence, because it is safe for memory
547
2
                // accesses from the new epoch to be executed before updating the local epoch. At
548
2
                // worse, other threads will see the new epoch late and delay GC slightly.
549
2
            }
0
550
0
        }
551
2
    }
552
553
    /// Increments the handle count.
554
    #[inline]
555
0
    pub(crate) fn acquire_handle(&self) {
556
0
        let handle_count = self.handle_count.get();
557
0
        debug_assert!(handle_count >= 1);
558
0
        self.handle_count.set(handle_count + 1);
559
0
    }
560
561
    /// Decrements the handle count.
562
    #[inline]
563
215
    pub(crate) fn release_handle(&self) {
564
215
        let guard_count = self.guard_count.get();
565
215
        let handle_count = self.handle_count.get();
566
215
        debug_assert!(handle_count >= 1);
567
215
        self.handle_count.set(handle_count - 1);
568
569
215
        if guard_count == 0 && handle_count == 1 {
570
215
            self.finalize();
571
215
        }
0
572
215
    }
<crossbeam_epoch::internal::Local>::release_handle
Line
Count
Source
563
145
    pub(crate) fn release_handle(&self) {
564
145
        let guard_count = self.guard_count.get();
565
145
        let handle_count = self.handle_count.get();
566
145
        debug_assert!(handle_count >= 1);
567
145
        self.handle_count.set(handle_count - 1);
568
569
145
        if guard_count == 0 && handle_count == 1 {
570
145
            self.finalize();
571
145
        }
0
572
145
    }
<crossbeam_epoch::internal::Local>::release_handle
Line
Count
Source
563
70
    pub(crate) fn release_handle(&self) {
564
70
        let guard_count = self.guard_count.get();
565
70
        let handle_count = self.handle_count.get();
566
70
        debug_assert!(handle_count >= 1);
567
70
        self.handle_count.set(handle_count - 1);
568
569
70
        if guard_count == 0 && handle_count == 1 {
570
70
            self.finalize();
571
70
        }
0
572
70
    }
573
574
    /// Removes the `Local` from the global linked list.
575
    #[cold]
576
215
    fn finalize(&self) {
577
215
        debug_assert_eq!(self.guard_count.get(), 0);
578
215
        debug_assert_eq!(self.handle_count.get(), 0);
579
580
        // Temporarily increment handle count. This is required so that the following call to `pin`
581
        // doesn't call `finalize` again.
582
215
        self.handle_count.set(1);
583
215
        unsafe {
584
215
            // Pin and move the local bag into the global queue. It's important that `push_bag`
585
215
            // doesn't defer destruction on any new garbage.
586
215
            let guard = &self.pin();
587
215
            self.global()
588
215
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
<crossbeam_epoch::internal::Local>::finalize::{closure#0}
Line
Count
Source
588
145
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
<crossbeam_epoch::internal::Local>::finalize::{closure#0}
Line
Count
Source
588
70
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
589
215
        }
590
215
        // Revert the handle count back to zero.
591
215
        self.handle_count.set(0);
592
215
593
215
        unsafe {
594
215
            // Take the reference to the `Global` out of this `Local`. Since we're not protected
595
215
            // by a guard at this time, it's crucial that the reference is read before marking the
596
215
            // `Local` as deleted.
597
216
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
<crossbeam_epoch::internal::Local>::finalize::{closure#1}
Line
Count
Source
597
146
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
<crossbeam_epoch::internal::Local>::finalize::{closure#1}
Line
Count
Source
597
70
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
598
215
599
215
            // Mark this node in the linked list as deleted.
600
215
            self.entry.delete(unprotected());
601
215
602
215
            // Finally, drop the reference to the global. Note that this might be the last reference
603
215
            // to the `Global`. If so, the global data will be destroyed and all deferred functions
604
215
            // in its queue will be executed.
605
215
            drop(collector);
606
215
        }
607
215
    }
<crossbeam_epoch::internal::Local>::finalize
Line
Count
Source
576
145
    fn finalize(&self) {
577
145
        debug_assert_eq!(self.guard_count.get(), 0);
578
145
        debug_assert_eq!(self.handle_count.get(), 0);
579
580
        // Temporarily increment handle count. This is required so that the following call to `pin`
581
        // doesn't call `finalize` again.
582
145
        self.handle_count.set(1);
583
145
        unsafe {
584
145
            // Pin and move the local bag into the global queue. It's important that `push_bag`
585
145
            // doesn't defer destruction on any new garbage.
586
145
            let guard = &self.pin();
587
145
            self.global()
588
145
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
589
145
        }
590
145
        // Revert the handle count back to zero.
591
145
        self.handle_count.set(0);
592
145
593
145
        unsafe {
594
145
            // Take the reference to the `Global` out of this `Local`. Since we're not protected
595
145
            // by a guard at this time, it's crucial that the reference is read before marking the
596
145
            // `Local` as deleted.
597
145
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
598
145
599
145
            // Mark this node in the linked list as deleted.
600
145
            self.entry.delete(unprotected());
601
145
602
145
            // Finally, drop the reference to the global. Note that this might be the last reference
603
145
            // to the `Global`. If so, the global data will be destroyed and all deferred functions
604
145
            // in its queue will be executed.
605
145
            drop(collector);
606
145
        }
607
145
    }
<crossbeam_epoch::internal::Local>::finalize
Line
Count
Source
576
70
    fn finalize(&self) {
577
70
        debug_assert_eq!(self.guard_count.get(), 0);
578
70
        debug_assert_eq!(self.handle_count.get(), 0);
579
580
        // Temporarily increment handle count. This is required so that the following call to `pin`
581
        // doesn't call `finalize` again.
582
70
        self.handle_count.set(1);
583
70
        unsafe {
584
70
            // Pin and move the local bag into the global queue. It's important that `push_bag`
585
70
            // doesn't defer destruction on any new garbage.
586
70
            let guard = &self.pin();
587
70
            self.global()
588
70
                .push_bag(self.bag.with_mut(|b| &mut *b), guard);
589
70
        }
590
70
        // Revert the handle count back to zero.
591
70
        self.handle_count.set(0);
592
70
593
70
        unsafe {
594
70
            // Take the reference to the `Global` out of this `Local`. Since we're not protected
595
70
            // by a guard at this time, it's crucial that the reference is read before marking the
596
70
            // `Local` as deleted.
597
70
            let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
598
70
599
70
            // Mark this node in the linked list as deleted.
600
70
            self.entry.delete(unprotected());
601
70
602
70
            // Finally, drop the reference to the global. Note that this might be the last reference
603
70
            // to the `Global`. If so, the global data will be destroyed and all deferred functions
604
70
            // in its queue will be executed.
605
70
            drop(collector);
606
70
        }
607
70
    }
608
}
609
610
impl IsElement<Local> for Local {
611
216
    fn entry_of(local: &Local) -> &Entry {
612
216
        let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
613
216
        unsafe { &*entry_ptr }
614
216
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::entry_of
Line
Count
Source
611
146
    fn entry_of(local: &Local) -> &Entry {
612
146
        let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
613
146
        unsafe { &*entry_ptr }
614
146
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::entry_of
Line
Count
Source
611
70
    fn entry_of(local: &Local) -> &Entry {
612
70
        let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
613
70
        unsafe { &*entry_ptr }
614
70
    }
615
616
10.8M
    unsafe fn element_of(entry: &Entry) -> &Local {
617
10.8M
        // offset_of! macro uses unsafe, but it's unnecessary in this context.
618
10.8M
        #[allow(unused_unsafe)]
619
10.8M
        let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
620
10.8M
        &*local_ptr
621
10.8M
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::element_of
Line
Count
Source
616
86.3k
    unsafe fn element_of(entry: &Entry) -> &Local {
617
86.3k
        // offset_of! macro uses unsafe, but it's unnecessary in this context.
618
86.3k
        #[allow(unused_unsafe)]
619
86.3k
        let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
620
86.3k
        &*local_ptr
621
86.3k
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::element_of
Line
Count
Source
616
10.7M
    unsafe fn element_of(entry: &Entry) -> &Local {
617
10.7M
        // offset_of! macro uses unsafe, but it's unnecessary in this context.
618
10.7M
        #[allow(unused_unsafe)]
619
10.7M
        let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
620
10.7M
        &*local_ptr
621
10.7M
    }
622
623
166
    unsafe fn finalize(entry: &Entry, guard: &Guard) {
624
166
        guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
625
166
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::finalize
Line
Count
Source
623
99
    unsafe fn finalize(entry: &Entry, guard: &Guard) {
624
99
        guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
625
99
    }
<crossbeam_epoch::internal::Local as crossbeam_epoch::sync::list::IsElement<crossbeam_epoch::internal::Local>>::finalize
Line
Count
Source
623
67
    unsafe fn finalize(entry: &Entry, guard: &Guard) {
624
67
        guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
625
67
    }
626
}
627
628
#[cfg(all(test, not(loom_crossbeam)))]
629
mod tests {
630
    use std::sync::atomic::{AtomicUsize, Ordering};
631
632
    use super::*;
633
634
    #[test]
635
1
    fn check_defer() {
crossbeam_epoch::internal::tests::check_defer::{closure#0}
Line
Count
Source
635
1
    fn check_defer() {
636
        static FLAG: AtomicUsize = AtomicUsize::new(0);
637
1
        fn set() {
638
1
            FLAG.store(42, Ordering::Relaxed);
639
1
        }
640
641
1
        let d = Deferred::new(set);
642
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
643
1
        d.call();
644
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 42);
645
1
    }
crossbeam_epoch::internal::tests::check_defer
Line
Count
Source
635
1
    fn check_defer() {
636
        static FLAG: AtomicUsize = AtomicUsize::new(0);
637
        fn set() {
638
            FLAG.store(42, Ordering::Relaxed);
639
        }
640
641
1
        let d = Deferred::new(set);
642
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
643
1
        d.call();
644
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 42);
645
1
    }
646
647
    #[test]
648
1
    fn check_bag() {
crossbeam_epoch::internal::tests::check_bag::{closure#0}
Line
Count
Source
648
1
    fn check_bag() {
649
        static FLAG: AtomicUsize = AtomicUsize::new(0);
650
62
        fn incr() {
651
62
            FLAG.fetch_add(1, Ordering::Relaxed);
652
62
        }
653
654
1
        let mut bag = Bag::new();
655
1
        assert!(bag.is_empty());
656
657
63
        for 
_62
in 0..MAX_OBJECTS {
658
62
            assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
659
62
            assert!(!bag.is_empty());
660
62
            assert_eq!(FLAG.load(Ordering::Relaxed), 0);
661
        }
662
663
1
        let result = unsafe { bag.try_push(Deferred::new(incr)) };
664
1
        assert!(result.is_err());
665
1
        assert!(!bag.is_empty());
666
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
667
668
1
        drop(bag);
669
1
        assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
670
1
    }
crossbeam_epoch::internal::tests::check_bag
Line
Count
Source
648
1
    fn check_bag() {
649
        static FLAG: AtomicUsize = AtomicUsize::new(0);
650
        fn incr() {
651
            FLAG.fetch_add(1, Ordering::Relaxed);
652
        }
653
654
1
        let mut bag = Bag::new();
655
1
        assert!(bag.is_empty());
656
657
63
        for 
_62
in 0..MAX_OBJECTS {
658
62
            assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
659
62
            assert!(!bag.is_empty());
660
62
            assert_eq!(FLAG.load(Ordering::Relaxed), 0);
661
        }
662
663
1
        let result = unsafe { bag.try_push(Deferred::new(incr)) };
664
1
        assert!(result.is_err());
665
1
        assert!(!bag.is_empty());
666
1
        assert_eq!(FLAG.load(Ordering::Relaxed), 0);
667
668
1
        drop(bag);
669
1
        assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
670
1
    }
671
}