xenium
nikolaev_scq.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_DETAIL_NIKOLAEV_SCQ_HPP
7 #define XENIUM_DETAIL_NIKOLAEV_SCQ_HPP
8 
9 #include "xenium/utils.hpp"
10 
11 #include <atomic>
12 #include <cassert>
13 #include <memory>
14 
15 namespace xenium::detail {
16 
17 struct nikolaev_scq {
18  struct empty_tag {};
19  struct full_tag {};
20  struct first_used_tag {};
21  struct first_empty_tag {};
22 
23  nikolaev_scq(std::size_t capacity, std::size_t remap_shift, empty_tag);
24  nikolaev_scq(std::size_t capacity, std::size_t remap_shift, full_tag);
25  nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_used_tag);
26  nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_empty_tag);
27 
28  template <bool Nonempty, bool Finalizable>
29  bool enqueue(std::uint64_t value, std::size_t capacity, std::size_t remap_shift);
30  template <bool Nonempty, std::size_t PopRetries>
31  bool dequeue(std::uint64_t& value, std::size_t capacity, std::size_t remap_shift);
32 
33  void finalize() { _tail.fetch_or(1, std::memory_order_relaxed); }
34  void set_threshold(std::int64_t v) { _threshold.store(v, std::memory_order_relaxed); }
35 
36  static constexpr std::size_t calc_remap_shift(std::size_t capacity) {
37  assert(utils::is_power_of_two(capacity));
38  return utils::find_last_bit_set(capacity / indexes_per_cacheline);
39  }
40 
41 private:
42  using index_t = std::uint64_t;
43  using indexdiff_t = std::int64_t;
44  using value_t = std::uint64_t;
45 
46  static constexpr std::size_t cacheline_size = 64;
47  static constexpr std::size_t indexes_per_cacheline = cacheline_size / sizeof(index_t);
48 
49  void catchup(std::uint64_t tail, std::uint64_t head);
50 
51  static inline indexdiff_t diff(index_t a, index_t b) { return static_cast<indexdiff_t>(a - b); }
52 
53  static inline index_t remap_index(index_t idx, std::size_t remap_shift, std::size_t n) {
54  assert(remap_shift == 0 || (1 << remap_shift) * indexes_per_cacheline == n);
55  idx >>= 1;
56  return ((idx & (n - 1)) >> remap_shift) | ((idx * indexes_per_cacheline) & (n - 1));
57  }
58 
59  // index values are structured as follows
60  // 0..log2(capacity)+1 bits - value [0..capacity-1, nil (=2*capacity-1)]
61  // 1 bit - is_safe flag
62  // log2(capacity) + 2..n bits - cycle
63 
64  std::atomic<index_t> _head;
65  alignas(64) std::atomic<std::int64_t> _threshold;
66  alignas(64) std::atomic<index_t> _tail;
67  alignas(64) std::unique_ptr<std::atomic<std::uint64_t>[]> _data;
68 
69  // the LSB is used for finaliziation
70  static constexpr index_t finalized = 1;
71  static constexpr index_t index_inc = 2;
72 };
73 
74 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, empty_tag) :
75  _head(0),
76  _threshold(-1),
77  _tail(0),
78  _data(new std::atomic<index_t>[capacity * 2]) {
79  const auto n = capacity * 2;
80  for (std::size_t i = 0; i < n; ++i) {
81  _data[remap_index(i << 1, remap_shift, n)].store(static_cast<index_t>(-1), std::memory_order_relaxed);
82  }
83 }
84 
85 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, full_tag) :
86  _head(0),
87  _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
88  _tail(capacity * index_inc),
89  _data(new std::atomic<index_t>[capacity * 2]) {
90  const auto n = capacity * 2;
91  for (std::size_t i = 0; i < capacity; ++i) {
92  _data[remap_index(i << 1, remap_shift, n)].store(n + i, std::memory_order_relaxed);
93  }
94  for (std::size_t i = capacity; i < n; ++i) {
95  _data[remap_index(i << 1, remap_shift, n)].store(static_cast<index_t>(-1), std::memory_order_relaxed);
96  }
97 }
98 
99 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_used_tag) :
100  _head(0),
101  _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
102  _tail(index_inc),
103  _data(new std::atomic<index_t>[capacity * 2]) {
104  const auto n = capacity * 2;
105  _data[remap_index(0, remap_shift, n)].store(n, std::memory_order_relaxed);
106  for (std::size_t i = 1; i < n; ++i) {
107  _data[remap_index(i << 1, remap_shift, n)].store(static_cast<index_t>(-1), std::memory_order_relaxed);
108  }
109 }
110 
111 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_empty_tag) :
112  _head(index_inc),
113  _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
114  _tail(capacity * index_inc),
115  _data(new std::atomic<index_t>[capacity * 2]) {
116  const auto n = capacity * 2;
117  _data[remap_index(0, remap_shift, n)].store(static_cast<index_t>(-1), std::memory_order_relaxed);
118  for (std::size_t i = 1; i < capacity; ++i) {
119  _data[remap_index(i << 1, remap_shift, n)].store(n + i, std::memory_order_relaxed);
120  }
121  for (std::size_t i = capacity; i < n; ++i) {
122  _data[remap_index(i << 1, remap_shift, n)].store(static_cast<index_t>(-1), std::memory_order_relaxed);
123  }
124 }
125 
126 template <bool Nonempty, bool Finalizable>
127 inline bool nikolaev_scq::enqueue(std::uint64_t value, std::size_t capacity, std::size_t remap_shift) {
128  assert(value < capacity);
129  const std::size_t n = capacity * 2;
130  const std::size_t is_safe_and_value_mask = 2 * n - 1;
131 
132  value ^= is_safe_and_value_mask;
133 
134  for (;;) {
135  auto tail = _tail.fetch_add(index_inc, std::memory_order_relaxed);
136  if constexpr (Finalizable) {
137  if (tail & finalized) {
138  return false;
139  }
140  }
141  if (tail & finalized) {
142  assert((tail & finalized) == 0);
143  }
144  const auto tail_cycle = tail | is_safe_and_value_mask;
145  const auto tidx = remap_index(tail, remap_shift, n);
146  // (1) - this acquire-load synchronizes-with the release-fetch_or (4) and the release-CAS (5)
147  auto entry = _data[tidx].load(std::memory_order_acquire);
148 
149  retry:
150  const auto entry_cycle = entry | is_safe_and_value_mask;
151  if (diff(entry_cycle, tail_cycle) < 0 &&
152  (entry == entry_cycle ||
153  (entry == (entry_cycle ^ n) && diff(_head.load(std::memory_order_relaxed), tail) <= 0))) {
154  // (2) - this release-CAS synchronizes-with the acquire-load (3) and the acquire-CAS (5)
155  if (!_data[tidx].compare_exchange_weak(
156  entry, tail_cycle ^ value, std::memory_order_release, std::memory_order_relaxed)) {
157  goto retry;
158  }
159 
160  const auto threshold = static_cast<std::int64_t>(n + capacity - 1);
161  if constexpr (!Nonempty) {
162  if (_threshold.load(std::memory_order_relaxed) != threshold) {
163  _threshold.store(threshold, std::memory_order_relaxed);
164  }
165  }
166  return true;
167  }
168  }
169  return true;
170 }
171 
172 template <bool Nonempty, std::size_t PopRetries>
173 inline bool nikolaev_scq::dequeue(std::uint64_t& value, std::size_t capacity, std::size_t remap_shift) {
174  if constexpr (!Nonempty) {
175  if (_threshold.load(std::memory_order_relaxed) < 0) {
176  return false;
177  }
178  }
179 
180  const std::size_t n = capacity * 2;
181  const std::size_t value_mask = n - 1;
182  const std::size_t is_safe_and_value_mask = 2 * n - 1;
183 
184  for (;;) {
185  const auto head = _head.fetch_add(index_inc, std::memory_order_relaxed);
186  assert((head & finalized) == 0);
187  const auto head_cycle = head | is_safe_and_value_mask;
188  const auto hidx = remap_index(head, remap_shift, n);
189  std::size_t attempt = 0;
190  std::uint64_t entry_cycle;
191  std::uint64_t entry_new;
192 
193  retry:
194  // (3) - this acquire-load synchronizes-with the release-CAS (2)
195  auto entry = _data[hidx].load(std::memory_order_acquire);
196  do {
197  entry_cycle = entry | is_safe_and_value_mask;
198  if (entry_cycle == head_cycle) {
199  // (4) - this release-fetch_or synchronizes-with the acquire-load (1)
200  _data[hidx].fetch_or(value_mask, std::memory_order_release);
201  value = entry & value_mask;
202  assert(value < capacity);
203  return true;
204  }
205 
206  if ((entry | n) != entry_cycle) {
207  entry_new = entry & ~n;
208  if (entry == entry_new) {
209  break;
210  }
211  } else {
212  auto tail = _tail.load(std::memory_order_relaxed);
213  if (diff(tail, head + index_inc) > 0 && ++attempt <= PopRetries) {
214  goto retry;
215  }
216  assert((head_cycle & is_safe_and_value_mask) == is_safe_and_value_mask);
217  entry_new = head_cycle;
218  }
219  } while (
220  diff(entry_cycle, head_cycle) < 0 &&
221  // (5) - in case of success, this release-CAS synchronizes with the acquire-load (1),
222  // in case of failure, this acquire-CAS synchronizes with the release-CAS (2)
223  // It would be sufficient to use release for the success order, but this triggers a
224  // false positive in TSan (see https://github.com/google/sanitizers/issues/1264)
225  !_data[hidx].compare_exchange_weak(entry, entry_new, std::memory_order_acq_rel, std::memory_order_acquire));
226 
227  if constexpr (!Nonempty) {
228  auto tail = _tail.load(std::memory_order_relaxed);
229  if (diff(tail, head + index_inc) <= 0) {
230  catchup(tail, head + index_inc);
231  _threshold.fetch_sub(1, std::memory_order_relaxed);
232  return false;
233  }
234 
235  if (_threshold.fetch_sub(1, std::memory_order_relaxed) <= 0) {
236  return false;
237  }
238  }
239  }
240 }
241 
242 inline void nikolaev_scq::catchup(std::uint64_t tail, std::uint64_t head) {
243  while (!_tail.compare_exchange_weak(tail, head, std::memory_order_relaxed)) {
244  head = _head.load(std::memory_order_relaxed);
245  if (diff(tail, head) >= 0) {
246  break;
247  }
248  }
249 }
250 } // namespace xenium::detail
251 
252 #endif