xenium
kirsch_kfifo_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
8 
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
13 
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <cstdint>
19 #include <stdexcept>
20 
21 namespace xenium {
43 template <class T, class... Policies>
45 private:
46  using traits = detail::pointer_queue_traits_t<T, Policies...>;
47  using raw_value_type = typename traits::raw_type;
48 
49 public:
50  using value_type = T;
51  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
52  static constexpr unsigned padding_bytes =
53  parameter::value_param_t<unsigned, policy::padding_bytes, sizeof(raw_value_type), Policies...>::value;
54 
55  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
56 
57  template <class... NewPolicies>
58  using with = kirsch_kfifo_queue<T, NewPolicies..., Policies...>;
59 
60  explicit kirsch_kfifo_queue(uint64_t k);
62 
63  kirsch_kfifo_queue(const kirsch_kfifo_queue&) = delete;
65 
66  kirsch_kfifo_queue& operator=(const kirsch_kfifo_queue&) = delete;
67  kirsch_kfifo_queue& operator=(kirsch_kfifo_queue&&) = delete;
68 
77  void push(value_type value);
78 
87  [[nodiscard]] bool try_pop(value_type& result);
88 
89 private:
91 
92  struct padded_entry {
93  std::atomic<marked_value> value;
94  // we use max here to avoid arrays of size zero which are not allowed by Visual C++
95  char padding[std::max(padding_bytes, 1u)];
96  };
97 
98  struct unpadded_entry {
99  std::atomic<marked_value> value;
100  };
101  using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
102 
103 public:
107  static constexpr std::size_t entry_size = sizeof(entry);
108 
109 private:
110  struct segment;
111 
112  struct segment_deleter {
113  void operator()(segment* seg) const { release_segment(seg); }
114  };
115  struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
116  using concurrent_ptr = typename reclaimer::template concurrent_ptr<segment, 16>;
117 
118  explicit segment(uint64_t k) : k(k) {}
119  ~segment() override {
120  for (unsigned i = 0; i < k; ++i) {
121  assert(items()[i].value.load(std::memory_order_relaxed).get() == nullptr);
122  }
123  }
124 
125  void delete_remaining_items() {
126  for (unsigned i = 0; i < k; ++i) {
127  traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
128  items()[i].value.store(nullptr, std::memory_order_relaxed);
129  }
130  }
131 
132  entry* items() noexcept { return reinterpret_cast<entry*>(this + 1); }
133 
134  std::atomic<bool> deleted{false};
135  const uint64_t k;
136  concurrent_ptr next{};
137  };
138 
139  using concurrent_ptr = typename segment::concurrent_ptr;
140  using marked_ptr = typename concurrent_ptr::marked_ptr;
141  using guard_ptr = typename concurrent_ptr::guard_ptr;
142 
143  segment* alloc_segment() const;
144  static void release_segment(segment* seg);
145 
146  template <bool Empty>
147  bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old) const noexcept;
148  void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
149  void advance_tail(marked_ptr tail_current) noexcept;
150  bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
151 
152  const std::size_t k_;
153  concurrent_ptr head_;
154  concurrent_ptr tail_;
155 };
156 
157 template <class T, class... Policies>
158 kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) : k_(k) {
159  const auto seg = alloc_segment();
160  head_.store(seg, std::memory_order_relaxed);
161  tail_.store(seg, std::memory_order_relaxed);
162 }
163 
164 template <class T, class... Policies>
165 kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
166  auto seg = head_.load(std::memory_order_relaxed).get();
167  while (seg) {
168  auto next = seg->next.load(std::memory_order_relaxed).get();
169  seg->delete_remaining_items();
170  release_segment(seg);
171  seg = next;
172  }
173 }
174 
175 template <class T, class... Policies>
176 auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
177  void* data = ::operator new(sizeof(segment) + k_ * sizeof(entry));
178  auto result = new (data) segment(k_);
179  for (std::size_t i = 0; i < k_; ++i) {
180  new (&result->items()[i]) entry();
181  }
182  return result;
183 }
184 
185 template <class T, class... Policies>
186 void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
187  seg->~segment();
188  ::operator delete(seg);
189 }
190 
191 template <class T, class... Policies>
193  if (value == nullptr) {
194  throw std::invalid_argument("value cannot be nullptr");
195  }
196 
197  raw_value_type raw_value = traits::get_raw(value);
198  guard_ptr tail_old;
199  for (;;) {
200  // (1) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
201  tail_old.acquire(tail_, std::memory_order_acquire);
202 
203  // TODO - local linearizability
204 
205  uint64_t idx = 0;
206  marked_value old_value;
207  bool found_idx = find_index<true>(tail_old, idx, old_value);
208  if (tail_old != tail_.load(std::memory_order_relaxed)) {
209  continue;
210  }
211 
212  if (found_idx) {
213  const marked_value new_value(raw_value, old_value.mark() + 1);
214  // (2) - this release-CAS synchronizes-with the acquire-CAS (5)
215  if (tail_old->items()[idx].value.compare_exchange_strong(
216  old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
217  committed(tail_old, new_value, idx)) {
218  traits::release(value);
219  // TODO - local linearizability
220  return;
221  }
222  } else {
223  advance_tail(tail_old);
224  }
225  }
226 }
227 
228 template <class T, class... Policies>
230  guard_ptr head_old;
231  for (;;) {
232  // (3) - this acquire-load synchronizes-with the release-CAS (10)
233  head_old.acquire(head_, std::memory_order_acquire);
234  auto h = head_old.get();
235  (void)h;
236  uint64_t idx = 0;
237  marked_value old_value;
238  bool found_idx = find_index<false>(head_old, idx, old_value);
239  if (head_old != head_.load(std::memory_order_relaxed)) {
240  continue;
241  }
242 
243  // (4) - this acquire-load synchronizes-with the release-CAS (9, 12, 14)
244  marked_ptr tail_old = tail_.load(std::memory_order_acquire);
245  if (found_idx) {
246  assert(old_value.get() != (void*)0x100);
247  if (head_old.get() == tail_old.get()) {
248  advance_tail(tail_old);
249  }
250 
251  const marked_value new_value(nullptr, old_value.mark() + 1);
252  // (5) - this acquire-CAS synchronizes-with the release-CAS (2)
253  if (head_old->items()[idx].value.compare_exchange_strong(
254  old_value, new_value, std::memory_order_acquire, std::memory_order_relaxed)) {
255  traits::store(result, old_value.get());
256  return true;
257  }
258  } else {
259  if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed)) {
260  return false; // queue is empty
261  }
262  advance_head(head_old, tail_old);
263  }
264  }
265 }
266 
267 template <class T, class... Policies>
268 template <bool Empty>
270  uint64_t& value_index,
271  marked_value& old) const noexcept {
272  const uint64_t k = segment->k;
273  const uint64_t random_index = utils::random() % k;
274  for (size_t i = 0; i < k; i++) {
275  uint64_t index = ((random_index + i) % k);
276  old = segment->items()[index].value.load(std::memory_order_relaxed);
277  if ((Empty && old.get() == nullptr) || (!Empty && old.get() != nullptr)) {
278  value_index = index;
279  return true;
280  }
281  }
282  return false;
283 }
284 
285 template <class T, class... Policies>
286 bool kirsch_kfifo_queue<T, Policies...>::committed(marked_ptr segment, marked_value value, uint64_t index) noexcept {
287  if (value != segment->items()[index].value.load(std::memory_order_relaxed)) {
288  return true;
289  }
290 
291  const marked_value empty_value(nullptr, value.mark() + 1);
292 
293  if (segment->deleted.load(std::memory_order_relaxed)) {
294  // Insert tail segment has been removed, but we are fine if element still has been removed.
295  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
296  }
297 
298  // (6) - this acquire-load synchronizes-with the release-CAS (10)
299  marked_ptr head_current = head_.load(std::memory_order_acquire);
300  if (segment.get() == head_current.get()) {
301  // Insert tail segment is now head.
302  marked_ptr new_head(head_current.get(), head_current.mark() + 1);
303  // This relaxed-CAS is part of a release sequence headed by (10)
304  if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed)) {
305  // We are fine if we can update head and thus fail any concurrent
306  // advance_head attempts.
307  return true;
308  }
309 
310  // We are fine if element still has been removed.
311  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
312  }
313 
314  if (!segment->deleted.load(std::memory_order_relaxed)) {
315  // Insert tail segment still not deleted.
316  return true;
317  }
318  // Head and tail moved beyond this segment. Try to remove the item.
319  // We are fine if element still has been removed.
320  return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
321 }
322 
323 template <class T, class... Policies>
324 void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
325  // (7) - this acquire-load synchronizes-with the release-CAS (13)
326  const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
327  if (head_current != head_.load(std::memory_order_relaxed)) {
328  return;
329  }
330 
331  if (head_current.get() == tail_current.get()) {
332  // (8) - this acquire-load synchronizes-with the release-CAS (13)
333  const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
334  if (tail_next_segment.get() == nullptr) {
335  return;
336  }
337 
338  if (tail_current == tail_.load(std::memory_order_relaxed)) {
339  marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
340  // (9) - this release-CAS synchronizes-with the acquire-load (1, 4)
341  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
342  }
343  }
344 
345  head_current->deleted.store(true, std::memory_order_relaxed);
346 
347  marked_ptr expected = head_current;
348  marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
349  // (10) - this release-CAS synchronizes-with the acquire-load (3, 6)
350  if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
351  head_current.reclaim();
352  }
353 }
354 
355 template <class T, class... Policies>
356 void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
357  // (11) - this acquire-load synchronizes-with the release-CAS (13)
358  marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
359  if (tail_current != tail_.load(std::memory_order_relaxed)) {
360  return;
361  }
362 
363  if (next_segment.get() != nullptr) {
364  marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
365  // (12) - this release-CAS synchronizes-with the acquire-load (1, 4)
366  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
367  } else {
368  auto seg = alloc_segment();
369  const marked_ptr new_segment(seg, next_segment.mark() + 1);
370  // TODO - insert own value to simplify push?
371  // (13) - this release-CAS synchronizes-with the acquire-load (7, 8, 11)
372  if (tail_current->next.compare_exchange_strong(
373  next_segment, new_segment, std::memory_order_release, std::memory_order_relaxed)) {
374  marked_ptr new_tail(seg, tail_current.mark() + 1);
375  // (14) - this release-CAS synchronizes-with the acquire-load (1, 4)
376  tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
377  } else {
378  release_segment(seg);
379  }
380  }
381 }
382 } // namespace xenium
383 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::kirsch_kfifo_queue
An unbounded lock-free multi-producer/multi-consumer k-FIFO queue.
Definition: kirsch_kfifo_queue.hpp:44
xenium::marked_ptr::mark
uintptr_t mark() const noexcept
Get the mark value.
Definition: marked_ptr.hpp:70
xenium::kirsch_kfifo_queue::push
void push(value_type value)
Pushes the given value to the queue.
Definition: kirsch_kfifo_queue.hpp:192
xenium::policy::padding_bytes
Policy to configure the number of padding bytes to add to each entry in kirsch_kfifo_queue and kirsch...
Definition: policy.hpp:116
xenium::policy::reclaimer
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25
xenium::kirsch_kfifo_queue::entry_size
static constexpr std::size_t entry_size
Provides the effective size of a single queue entry (including padding).
Definition: kirsch_kfifo_queue.hpp:107
xenium::marked_ptr::get
T * get() const noexcept
Get underlying pointer (with mark bits stripped off).
Definition: marked_ptr.hpp:77
xenium::kirsch_kfifo_queue::try_pop
bool try_pop(value_type &result)
Tries to pop an object from the queue.
Definition: kirsch_kfifo_queue.hpp:229