xenium
kirsch_bounded_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 #include <stdexcept>
20 
21 #ifdef _MSC_VER
22  #pragma warning(push)
23  #pragma warning(disable : 26495) // uninitialized member variable
24 #endif
25 
26 namespace xenium {
46 template <class T, class... Policies>
48 private:
49  using traits = detail::pointer_queue_traits_t<T, Policies...>;
50  using raw_value_type = typename traits::raw_type;
51 
52 public:
53  using value_type = T;
54  static constexpr unsigned padding_bytes =
55  parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
56 
57  kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments);
59 
62 
63  kirsch_bounded_kfifo_queue& operator=(const kirsch_bounded_kfifo_queue&) = delete;
65 
74  bool try_push(value_type value);
75 
84  bool try_pop(value_type& result);
85 
86 private:
88 
89  struct padded_entry {
90  std::atomic<marked_value> value;
91  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
92  char padding[std::max(padding_bytes, 1U)];
93  };
94 
95  struct unpadded_entry {
96  std::atomic<marked_value> value;
97  };
98 
99  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
100 
101 public:
105  static constexpr std::size_t entry_size = sizeof(entry);
106 
107 private:
108  struct marked_idx {
109  marked_idx() = default;
110  marked_idx(uint64_t val, uint64_t mark) noexcept { _val = val | (mark << bits); }
111 
112  [[nodiscard]] uint64_t get() const noexcept { return _val & val_mask; }
113  [[nodiscard]] uint64_t mark() const noexcept { return _val >> bits; }
114  bool operator==(const marked_idx& other) const noexcept { return this->_val == other._val; }
115  bool operator!=(const marked_idx& other) const noexcept { return this->_val != other._val; }
116 
117  private:
118  static constexpr unsigned bits = 16;
119  static constexpr uint64_t val_mask = (static_cast<uint64_t>(1) << bits) - 1;
120  uint64_t _val = 0;
121  };
122 
123  template <bool Empty>
124  bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
125  bool queue_full(const marked_idx& head_old, const marked_idx& tail_old) const;
126  bool segment_empty(const marked_idx& head_old) const;
127  [[nodiscard]] bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
128  [[nodiscard]] bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current) const;
129  bool committed(const marked_idx& tail_old, marked_value new_value, uint64_t index);
130 
131  std::uint64_t _queue_size;
132  std::size_t _k;
133  // all operations on head/tail are synchronized via the value operations and
134  // can therefore use memory_order_relaxed.
135  std::atomic<marked_idx> _head;
136  std::atomic<marked_idx> _tail;
137  std::unique_ptr<entry[]> _queue;
138 };
139 
140 template <class T, class... Policies>
141 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
142  _queue_size(k * num_segments),
143  _k(k),
144  _head(),
145  _tail(),
146  _queue(new entry[k * num_segments]()) {}
147 
148 template <class T, class... Policies>
149 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
150  for (unsigned i = 0; i < _queue_size; ++i) {
151  traits::delete_value(_queue[i].value.load(std::memory_order_relaxed).get());
152  }
153 }
154 
155 template <class T, class... Policies>
157  if (value == nullptr) {
158  throw std::invalid_argument("value can not be nullptr");
159  }
160 
161  raw_value_type raw_value = traits::get_raw(value);
162  for (;;) {
163  marked_idx tail_old = _tail.load(std::memory_order_relaxed);
164  marked_idx head_old = _head.load(std::memory_order_relaxed);
165 
166  uint64_t idx;
167  marked_value old_value;
168  bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
169  if (tail_old != _tail.load(std::memory_order_relaxed)) {
170  continue;
171  }
172 
173  if (found_idx) {
174  assert(old_value.get() == nullptr);
175  const marked_value new_value(raw_value, old_value.mark() + 1);
176  // (1) - this release-CAS synchronizes with the acquire-load (3, 4)
177  if (_queue[idx].value.compare_exchange_strong(
178  old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
179  committed(tail_old, new_value, idx)) {
180  traits::release(value);
181  return true;
182  }
183  } else {
184  if (queue_full(head_old, tail_old)) {
185  if (segment_empty(head_old)) {
186  // increment head by k
187  marked_idx new_head((head_old.get() + _k) % _queue_size, head_old.mark() + 1);
188  _head.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
189  } else if (head_old == _head.load(std::memory_order_relaxed)) {
190  // queue is full
191  return false;
192  }
193  }
194  // increment tail by k
195  marked_idx new_tail((tail_old.get() + _k) % _queue_size, tail_old.mark() + 1);
196  _tail.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
197  }
198  }
199 }
200 
201 template <class T, class... Policies>
203  for (;;) {
204  marked_idx head_old = _head.load(std::memory_order_relaxed);
205  marked_idx tail_old = _tail.load(std::memory_order_relaxed);
206 
207  uint64_t idx;
208  marked_value old_value;
209  bool found_idx = find_index<false>(head_old.get(), idx, old_value);
210  if (head_old != _head.load(std::memory_order_relaxed)) {
211  continue;
212  }
213 
214  if (found_idx) {
215  assert(old_value.get() != nullptr);
216  if (head_old.get() == tail_old.get()) {
217  marked_idx new_tail((tail_old.get() + _k) % _queue_size, tail_old.mark() + 1);
218  _tail.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
219  }
220  marked_value new_value(nullptr, old_value.mark() + 1);
221  // (2) - this release-CAS synchronizes with the acquire-load (3, 4)
222  if (_queue[idx].value.compare_exchange_strong(
223  old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
224  traits::store(result, old_value.get());
225  return true;
226  }
227  } else {
228  if (head_old.get() == tail_old.get() && tail_old == _tail.load(std::memory_order_relaxed)) {
229  return false;
230  }
231 
232  marked_idx new_head((head_old.get() + _k) % _queue_size, head_old.mark() + 1);
233  _head.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
234  }
235  }
236 }
237 
238 template <class T, class... Policies>
239 template <bool Empty>
241  uint64_t& value_index,
242  marked_value& old) {
243  const uint64_t random_index = utils::random() % _k;
244  for (size_t i = 0; i < _k; i++) {
245  // TODO - this can be simplified if queue_size is a multiple of k!
246  uint64_t index = (start_index + ((random_index + i) % _k)) % _queue_size;
247  // (3) - this acquire-load synchronizes-with the release-CAS (1, 2)
248  old = _queue[index].value.load(std::memory_order_acquire);
249  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
250  value_index = index;
251  return true;
252  }
253  }
254  return false;
255 }
256 
257 template <class T, class... Policies>
258 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(const marked_idx& tail_old,
259  marked_value value,
260  uint64_t index) {
261  if (_queue[index].value.load(std::memory_order_relaxed) != value) {
262  return true;
263  }
264 
265  marked_idx tail_current = _tail.load(std::memory_order_relaxed);
266  marked_idx head_current = _head.load(std::memory_order_relaxed);
267  if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
268  return true;
269  }
270 
271  if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
272  marked_value new_value(nullptr, value.mark() + 1);
273  if (!_queue[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed)) {
274  return true;
275  }
276  } else {
277  marked_idx new_head(head_current.get(), head_current.mark() + 1);
278  if (_head.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed)) {
279  return true;
280  }
281 
282  marked_value new_value(nullptr, value.mark() + 1);
283  if (!_queue[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed)) {
284  return true;
285  }
286  }
287  return false;
288 }
289 
290 template <class T, class... Policies>
291 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(const marked_idx& head_old,
292  const marked_idx& tail_old) const {
293  return (((tail_old.get() + _k) % _queue_size) == head_old.get() &&
294  (head_old == _head.load(std::memory_order_relaxed)));
295 }
296 
297 template <class T, class... Policies>
298 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(const marked_idx& head_old) const {
299  const uint64_t start = head_old.get();
300  for (size_t i = 0; i < _k; i++) {
301  // TODO - this can be simplified if queue_size is a multiple of k!
302  // (4) - this acquire-load synchronizes-with the release-CAS (1, 2)
303  if (_queue[(start + i) % _queue_size].value.load(std::memory_order_acquire).get() != nullptr) {
304  return false;
305  }
306  }
307  return true;
308 }
309 
310 template <class T, class... Policies>
311 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
312  uint64_t tail_current,
313  uint64_t head_current) const {
314  bool wrap_around = tail_current < head_current;
315  if (!wrap_around) {
316  return head_current < tail_old && tail_old <= tail_current;
317  }
318  return head_current < tail_old || tail_old <= tail_current;
319 }
320 
321 template <class T, class... Policies>
322 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
323  uint64_t tail_current,
324  uint64_t head_current) const {
325  bool wrap_around = tail_current < head_current;
326  if (!wrap_around) {
327  return tail_old < tail_current || head_current < tail_old;
328  }
329  return tail_old < tail_current && head_current < tail_old;
330 }
331 } // namespace xenium
332 #ifdef _MSC_VER
333  #pragma warning(pop)
334 #endif
335 
336 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_bounded_kfifo_queue::try_push
bool try_push(value_type value)
Tries to push a new element to the queue.
Definition: kirsch_bounded_kfifo_queue.hpp:156
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:116
xenium::kirsch_bounded_kfifo_queue::try_pop
bool try_pop(value_type &result)
Tries to pop an element from the queue.
Definition: kirsch_bounded_kfifo_queue.hpp:202
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_bounded_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_bounded_kfifo_queue.hpp:105
xenium::kirsch_bounded_kfifo_queue
A bounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_bounded_kfifo_queue.hpp:47