xenium
generic_epoch_based.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef GENERIC_EPOCH_BASED_IMPL
7  #error "This is an impl file and must not be included directly!"
8 #endif
9 
10 #include <array>
11 #include <xenium/detail/port.hpp>
12 
13 #ifdef _MSC_VER
14  #pragma warning(push)
15  #pragma warning(disable : 4127) // conditional expression is constant
16 #endif
17 
18 namespace xenium::reclamation {
19 namespace scan {
23  struct all_threads {
24  template <class Reclaimer>
25  struct type {
26  bool scan(typename Reclaimer::epoch_t epoch) {
27  auto prevents_update = [epoch](const typename Reclaimer::thread_control_block& data) -> bool {
28  // TSan does not support explicit fences, so we cannot rely on the acquire-fence (6)
29  // but have to perform an acquire-load here to avoid false positives.
30  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_acquire, std::memory_order_relaxed);
31  return data.is_in_critical_region.load(memory_order) && data.local_epoch.load(memory_order) != epoch;
32  };
33 
34  // If any thread hasn't advanced to the current epoch, abort the attempt.
35  return !std::any_of(
36  Reclaimer::global_thread_block_list.begin(), Reclaimer::global_thread_block_list.end(), prevents_update);
37  }
38 
39  void reset() {}
40  };
41  };
42 
46  template <unsigned N>
47  struct n_threads {
48  template <class Reclaimer>
49  struct type {
50  type() { reset(); }
51 
52  bool scan(typename Reclaimer::epoch_t epoch) {
53  for (unsigned i = 0; i < N; ++i) {
54  // TSan does not support explicit fences, so we cannot rely on the acquire-fence (6)
55  // but have to perform an acquire-load here to avoid false positives.
56  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_acquire, std::memory_order_relaxed);
57  if (!thread_iterator->is_in_critical_region.load(memory_order) ||
58  thread_iterator->local_epoch.load(memory_order) == epoch) {
59  if (++thread_iterator == Reclaimer::global_thread_block_list.end()) {
60  return true;
61  }
62  }
63  }
64  return false;
65  }
66 
67  void reset() { thread_iterator = Reclaimer::global_thread_block_list.begin(); }
68 
69  private:
70  typename detail::thread_block_list<typename Reclaimer::thread_control_block>::iterator thread_iterator;
71  };
72  };
73 
77  struct one_thread {
78  template <class Reclaimer>
79  using type = n_threads<1>::type<Reclaimer>;
80  };
81 } // namespace scan
82 
83 namespace abandon {
87  struct never {
88  using retire_list = detail::retire_list<>;
89  static void apply(retire_list&, detail::orphan_list<>&) {}
90  };
91 
96  struct always {
97  using retire_list = detail::retire_list<>;
98  static void apply(retire_list& retire_list, detail::orphan_list<>& orphans) {
99  if (!retire_list.empty()) {
100  orphans.add(retire_list.steal());
101  }
102  }
103  };
104 
109  template <size_t Threshold>
110  struct when_exceeds_threshold {
111  using retire_list = detail::counting_retire_list<>;
112  static void apply(retire_list& retire_list, detail::orphan_list<>& orphans) {
113  if (retire_list.size() >= Threshold) {
114  orphans.add(retire_list.steal());
115  }
116  }
117  };
118 } // namespace abandon
119 
120 template <class Traits>
121 generic_epoch_based<Traits>::region_guard::region_guard() noexcept {
122  local_thread_data.enter_region();
123 }
124 
125 template <class Traits>
126 generic_epoch_based<Traits>::region_guard::~region_guard() noexcept {
127  local_thread_data.leave_region();
128 }
129 
130 template <class Traits>
131 template <class T, class MarkedPtr>
132 generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(const MarkedPtr& p) noexcept : base(p) {
133  if (this->ptr) {
134  local_thread_data.enter_critical();
135  }
136 }
137 
138 template <class Traits>
139 template <class T, class MarkedPtr>
140 generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(const guard_ptr& p) noexcept :
141  guard_ptr(MarkedPtr(p)) {}
142 
143 template <class Traits>
144 template <class T, class MarkedPtr>
145 generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::guard_ptr(guard_ptr&& p) noexcept : base(p.ptr) {
146  p.ptr.reset();
147 }
148 
149 template <class Traits>
150 template <class T, class MarkedPtr>
151 auto generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::operator=(const guard_ptr& p) noexcept -> guard_ptr& {
152  if (&p == this) {
153  return *this;
154  }
155 
156  reset();
157  this->ptr = p.ptr;
158  if (this->ptr) {
159  local_thread_data.enter_critical();
160  }
161 
162  return *this;
163 }
164 
165 template <class Traits>
166 template <class T, class MarkedPtr>
167 auto generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::operator=(guard_ptr&& p) noexcept -> guard_ptr& {
168  if (&p == this) {
169  return *this;
170  }
171 
172  reset();
173  this->ptr = std::move(p.ptr);
174  p.ptr.reset();
175 
176  return *this;
177 }
178 
179 template <class Traits>
180 template <class T, class MarkedPtr>
181 void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::acquire(const concurrent_ptr<T>& p,
182  std::memory_order order) noexcept {
183  if (p.load(std::memory_order_relaxed) == nullptr) {
184  reset();
185  return;
186  }
187 
188  if (!this->ptr) {
189  local_thread_data.enter_critical();
190  }
191  // (1) - this load operation potentially synchronizes-with any release operation on p.
192  this->ptr = p.load(order);
193  if (!this->ptr) {
194  local_thread_data.leave_critical();
195  }
196 }
197 
198 template <class Traits>
199 template <class T, class MarkedPtr>
200 bool generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::acquire_if_equal(const concurrent_ptr<T>& p,
201  const MarkedPtr& expected,
202  std::memory_order order) noexcept {
203  auto actual = p.load(std::memory_order_relaxed);
204  if (actual == nullptr || actual != expected) {
205  reset();
206  return actual == expected;
207  }
208 
209  if (!this->ptr) {
210  local_thread_data.enter_critical();
211  }
212  // (2) - this load operation potentially synchronizes-with any release operation on p.
213  this->ptr = p.load(order);
214  if (!this->ptr || this->ptr != expected) {
215  local_thread_data.leave_critical();
216  this->ptr.reset();
217  }
218 
219  return this->ptr == expected;
220 }
221 
222 template <class Traits>
223 template <class T, class MarkedPtr>
224 void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::reset() noexcept {
225  if (this->ptr) {
226  local_thread_data.leave_critical();
227  }
228  this->ptr.reset();
229 }
230 
231 template <class Traits>
232 template <class T, class MarkedPtr>
233 void generic_epoch_based<Traits>::guard_ptr<T, MarkedPtr>::reclaim(Deleter d) noexcept {
234  this->ptr->set_deleter(std::move(d));
235  local_thread_data.add_retired_node(this->ptr.get());
236  reset();
237 }
238 
239 template <class Traits>
240 struct generic_epoch_based<Traits>::thread_control_block : detail::thread_block_list<thread_control_block>::entry {
241  thread_control_block() : is_in_critical_region(false), local_epoch(number_epochs) {}
242 
243  std::atomic<bool> is_in_critical_region;
244  std::atomic<epoch_t> local_epoch;
245 };
246 
247 template <class Traits>
248 struct generic_epoch_based<Traits>::thread_data {
249  ~thread_data() {
250  if (control_block == nullptr) {
251  return; // no control_block -> nothing to do
252  }
253 
254  for (unsigned i = 0; i < number_epochs; ++i) {
255  if (!retire_lists[i].empty()) {
256  orphans[i].add(retire_lists[i].steal());
257  }
258  }
259 
260  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed) == false);
261  global_thread_block_list.release_entry(control_block);
262  control_block = nullptr;
263  }
264 
265  void enter_region() {
266  ensure_has_control_block();
267  if (Traits::region_extension_type != region_extension::none && ++region_entries == 1) {
268  if (Traits::region_extension_type == region_extension::eager) {
269  set_critical_region_flag();
270  }
271  }
272  }
273 
274  void leave_region() {
275  if (Traits::region_extension_type != region_extension::none && --region_entries == 0) {
276  clear_critical_region_flag();
277  }
278  }
279 
280  void enter_critical() {
281  enter_region();
282  if (++nested_critical_entries == 1) {
283  do_enter_critical();
284  }
285  }
286 
287  void leave_critical() {
288  if (--nested_critical_entries == 0 && Traits::region_extension_type == region_extension::none) {
289  clear_critical_region_flag();
290  }
291  leave_region();
292  }
293 
294 private:
295  void ensure_has_control_block() {
296  if (XENIUM_UNLIKELY(control_block == nullptr)) {
297  acquire_control_block();
298  }
299  }
300 
301  XENIUM_NOINLINE void acquire_control_block() {
302  assert(control_block == nullptr);
303  control_block = global_thread_block_list.acquire_entry();
304  assert(control_block->is_in_critical_region.load() == false);
305  auto epoch = global_epoch.load(std::memory_order_relaxed);
306  control_block->local_epoch.store(epoch, std::memory_order_relaxed);
307  local_epoch_idx = epoch % number_epochs;
308  scan_strategy.reset();
309  }
310 
311  void set_critical_region_flag() {
312  assert(!control_block->is_in_critical_region.load(std::memory_order_relaxed));
313  control_block->is_in_critical_region.store(true, std::memory_order_relaxed);
314  // (3) - this seq_cst-fence enforces a total order with itself, and
315  // synchronizes-with the acquire-fence (6)
316  std::atomic_thread_fence(std::memory_order_seq_cst);
317  }
318 
319  void clear_critical_region_flag() {
320  // if (Traits::region_extension_type == region_extension::lazy && control_block == nullptr)
321  // return; TODO - can this be removed?
322 
323  assert(control_block != nullptr);
324  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed));
325 
326  // (4) - this release-store synchronizes-with the acquire-fence (6)
327  control_block->is_in_critical_region.store(false, std::memory_order_release);
328  for (unsigned i = 0; i < number_epochs; ++i) {
329  Traits::abandon_strategy::apply(retire_lists[i], orphans[i]);
330  }
331  }
332 
333  void do_enter_critical() {
334  if constexpr (Traits::region_extension_type == region_extension::none) {
335  set_critical_region_flag();
336  } else if constexpr (Traits::region_extension_type == region_extension::lazy) {
337  if (!control_block->is_in_critical_region.load(std::memory_order_relaxed)) {
338  set_critical_region_flag();
339  }
340  }
341 
342  assert(control_block->is_in_critical_region.load(std::memory_order_relaxed));
343 
344  // (5) - this acquire-load synchronizes-with the release-CAS (7)
345  auto epoch = global_epoch.load(std::memory_order_acquire);
346  if (control_block->local_epoch.load(std::memory_order_relaxed) != epoch) // New epoch?
347  {
348  critical_entries_since_update = 0;
349  update_local_epoch(epoch);
350  } else if (critical_entries_since_update++ == Traits::scan_frequency) {
351  critical_entries_since_update = 0;
352  if (scan_strategy.scan(epoch)) {
353  epoch = update_global_epoch(epoch, epoch + 1);
354  update_local_epoch(epoch);
355  }
356  }
357  }
358 
359  void update_local_epoch(epoch_t new_epoch) {
360  // we either just updated the global_epoch or we are observing a new epoch
361  // from some other thread either way - we can reclaim all the objects from
362  // the old 'incarnation' of this epoch, as well as from other epochs we
363  // might have missed when we were not in a critical region.
364 
365  const auto old_epoch = control_block->local_epoch.load(std::memory_order_relaxed);
366  assert(new_epoch > old_epoch);
367  // TSan does not support explicit fences, so we cannot rely on the fences (3) and (6)
368  // but have to perform a release-store here to avoid false positives.
369  constexpr auto memory_order = TSAN_MEMORY_ORDER(std::memory_order_release, std::memory_order_relaxed);
370  control_block->local_epoch.store(new_epoch, memory_order);
371 
372  auto diff = std::min<int>(static_cast<int>(number_epochs), static_cast<int>(new_epoch - old_epoch));
373  epoch_t epoch_idx = local_epoch_idx;
374  for (int i = diff - 1; i >= 0; --i) {
375  epoch_idx = (new_epoch - i) % number_epochs;
376  auto nodes = retire_lists[epoch_idx].steal();
377  detail::delete_objects(nodes.first);
378  }
379  local_epoch_idx = epoch_idx;
380 
381  scan_strategy.reset();
382  }
383 
384  epoch_t update_global_epoch(epoch_t curr_epoch, epoch_t new_epoch) {
385  if (global_epoch.load(std::memory_order_relaxed) == curr_epoch) {
386  // (6) - due to the load operations in scan, this acquire-fence synchronizes-with the release-store (4)
387  // and the seq-cst fence (3)
388  std::atomic_thread_fence(std::memory_order_acquire);
389 
390  // (7) - this release-CAS synchronizes-with the acquire-load (5)
391  bool success = global_epoch.compare_exchange_strong(
392  curr_epoch, new_epoch, std::memory_order_release, std::memory_order_relaxed);
393  if (XENIUM_LIKELY(success)) {
394  reclaim_orphans(new_epoch);
395  }
396  }
397  return new_epoch;
398  }
399 
400  void add_retired_node(detail::deletable_object* p) { retire_lists[local_epoch_idx].push(p); }
401 
402  void reclaim_orphans(epoch_t epoch) {
403  auto idx = epoch % number_epochs;
404  auto* nodes = orphans[idx].adopt();
405  detail::delete_objects(nodes);
406  }
407 
408  unsigned critical_entries_since_update = 0;
409  unsigned nested_critical_entries = 0;
410  unsigned region_entries = 0;
411  typename Traits::scan_strategy::template type<generic_epoch_based> scan_strategy;
412  thread_control_block* control_block = nullptr;
413  epoch_t local_epoch_idx = 0;
414  std::array<typename Traits::abandon_strategy::retire_list, number_epochs> retire_lists = {};
415 
416  friend class generic_epoch_based;
417  ALLOCATION_COUNTER(generic_epoch_based);
418 };
419 
420 #ifdef TRACK_ALLOCATIONS
421 template <class Traits>
422 inline void generic_epoch_based<Traits>::count_allocation() {
423  local_thread_data.allocation_counter.count_allocation();
424 }
425 
426 template <class Traits>
427 inline void generic_epoch_based<Traits>::count_reclamation() {
428  local_thread_data.allocation_counter.count_reclamation();
429 }
430 #endif
431 } // namespace xenium::reclamation
432 
433 #ifdef _MSC_VER
434  #pragma warning(pop)
435 #endif
xenium::reclamation::scan::n_threads
Scan N threads.
Definition: generic_epoch_based.hpp:34
xenium::reclamation::abandon::never
Never abandon any nodes (except when the thread terminates).
Definition: generic_epoch_based.hpp:87
xenium::reclamation::abandon::always
Always abandon the remaining retired nodes when the thread leaves its critical region.
Definition: generic_epoch_based.hpp:96
xenium::reclamation::abandon::when_exceeds_threshold
Abandon the retired nodes upon leaving the critical region when the number of nodes exceeds the speci...
Definition: generic_epoch_based.hpp:53
xenium::reclamation::scan::one_thread
Scan a single thread (default behaviour in DEBRA).
Definition: generic_epoch_based.hpp:77
xenium::reclamation::scan::all_threads
Scan all threads (default behaviour in EBR/NEBR).
Definition: generic_epoch_based.hpp:23