xenium
vyukov_bounded_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
7 #define XENIUM_VYUKOV_BOUNDED_QUEUE_HPP
8 
9 #include <xenium/parameter.hpp>
10 #include <xenium/utils.hpp>
11 
12 #include <atomic>
13 #include <cassert>
14 #include <cstdint>
15 #include <memory>
16 
17 #ifdef _MSC_VER
18  #pragma warning(push)
19  #pragma warning(disable : 4324) // structure was padded due to alignment specifier
20 #endif
21 
22 namespace xenium {
23 
24 namespace policy {
30  template <bool Value>
32 
33 } // namespace policy
58 template <class T, class... Policies>
60 public:
61  using value_type = T;
62 
63  static constexpr bool default_to_weak =
64  parameter::value_param_t<bool, policy::default_to_weak, false, Policies...>::value;
65  ;
66 
71  explicit vyukov_bounded_queue(std::size_t size) : cells(new cell[size]), index_mask(size - 1) {
72  assert(size >= 2 && utils::is_power_of_two(size));
73  for (std::size_t i = 0; i < size; ++i) {
74  cells[i].sequence.store(i, std::memory_order_relaxed);
75  }
76  enqueue_pos.store(0, std::memory_order_relaxed);
77  dequeue_pos.store(0, std::memory_order_relaxed);
78  }
79 
82 
83  vyukov_bounded_queue& operator=(const vyukov_bounded_queue&) = delete;
84  vyukov_bounded_queue& operator=(vyukov_bounded_queue&&) = delete;
85 
98  template <class... Args>
99  bool try_push(Args&&... args) {
100  return do_try_push<default_to_weak>(std::forward<Args>(args)...);
101  }
102 
120  template <class... Args>
121  bool try_push_strong(Args&&... args) {
122  return do_try_push<false>(std::forward<Args>(args)...);
123  }
124 
142  template <class... Args>
143  bool try_push_weak(Args&&... args) {
144  return do_try_push<true>(std::forward<Args>(args)...);
145  }
146 
158  [[nodiscard]] bool try_pop(T& result) { return do_try_pop<default_to_weak>(result); }
159 
171  [[nodiscard]] bool try_pop_strong(T& result) { return do_try_pop<false>(result); }
172 
184  [[nodiscard]] bool try_pop_weak(T& result) { return do_try_pop<true>(result); }
185 
186 private:
187  template <bool Weak, class... Args>
188  bool do_try_push(Args&&... args) {
189  cell* c;
190  std::size_t pos = enqueue_pos.load(std::memory_order_relaxed);
191  for (;;) {
192  c = &cells[pos & index_mask];
193  // (3) - this acquire-load synchronizes-with the release-store (2)
194  std::size_t seq = c->sequence.load(std::memory_order_acquire);
195  if (seq == pos) {
196  if (enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
197  break;
198  }
199  } else {
200  if (Weak) {
201  if (seq < pos) {
202  return false;
203  }
204  pos = enqueue_pos.load(std::memory_order_relaxed);
205  } else {
206  auto pos2 = enqueue_pos.load(std::memory_order_relaxed);
207  if (pos2 == pos && dequeue_pos.load(std::memory_order_relaxed) + index_mask + 1 == pos) {
208  return false;
209  }
210  pos = pos2;
211  }
212  }
213  }
214  assign_value(c->value, std::forward<Args>(args)...);
215  // (4) - this release-store synchronizes-with the acquire-load (1)
216  c->sequence.store(pos + 1, std::memory_order_release);
217  return true;
218  }
219 
220  template <bool Weak>
221  bool do_try_pop(T& result) {
222  cell* c;
223  std::size_t pos = dequeue_pos.load(std::memory_order_relaxed);
224  for (;;) {
225  c = &cells[pos & index_mask];
226  // (1) - this acquire-load synchronizes-with the release-store (4)
227  std::size_t seq = c->sequence.load(std::memory_order_acquire);
228  auto new_pos = pos + 1;
229  if (seq == new_pos) {
230  if (dequeue_pos.compare_exchange_weak(pos, new_pos, std::memory_order_relaxed)) {
231  break;
232  }
233  } else {
234  if (Weak) {
235  if (seq < new_pos) {
236  return false;
237  }
238  pos = dequeue_pos.load(std::memory_order_relaxed);
239  } else {
240  auto pos2 = dequeue_pos.load(std::memory_order_relaxed);
241  if (pos2 == pos && enqueue_pos.load(std::memory_order_relaxed) == pos) {
242  return false;
243  }
244  pos = pos2;
245  }
246  }
247  }
248  result = std::move(c->value);
249  // (2) - this release-store synchronizes-with the acquire-load (3)
250  c->sequence.store(pos + index_mask + 1, std::memory_order_release);
251  return true;
252  }
253 
254  void assign_value(T& v, const T& source) { v = source; }
255  void assign_value(T& v, T&& source) { v = std::move(source); }
256  template <class... Args>
257  void assign_value(T& v, Args&&... args) {
258  v = T{std::forward<Args>(args)...};
259  }
260 
261  // TODO - add optional padding via policy
262  struct cell {
263  std::atomic<std::size_t> sequence;
264  T value;
265  };
266 
267  std::unique_ptr<cell[]> cells;
268  const std::size_t index_mask;
269  alignas(64) std::atomic<size_t> enqueue_pos;
270  alignas(64) std::atomic<size_t> dequeue_pos;
271 };
272 } // namespace xenium
273 
274 #ifdef _MSC_VER
275  #pragma warning(pop)
276 #endif
277 
278 #endif
xenium::vyukov_bounded_queue::try_pop_weak
bool try_pop_weak(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:184
xenium::vyukov_bounded_queue
A bounded generic multi-producer/multi-consumer FIFO queue.
Definition: vyukov_bounded_queue.hpp:59
xenium::policy::default_to_weak
Policy to configure whether try_push/try_pop in vyukov_bounded_queue should default to try_push_weak/...
Definition: vyukov_bounded_queue.hpp:31
xenium::vyukov_bounded_queue::try_push_weak
bool try_push_weak(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:143
xenium::vyukov_bounded_queue::try_push_strong
bool try_push_strong(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:121
xenium::vyukov_bounded_queue::vyukov_bounded_queue
vyukov_bounded_queue(std::size_t size)
Constructs a new instance with the specified maximum size.
Definition: vyukov_bounded_queue.hpp:71
xenium::vyukov_bounded_queue::try_pop
bool try_pop(T &result)
Tries to pop an element from the queue.
Definition: vyukov_bounded_queue.hpp:158
xenium::vyukov_bounded_queue::try_push
bool try_push(Args &&... args)
Tries to push a new element to the queue.
Definition: vyukov_bounded_queue.hpp:99
xenium::vyukov_bounded_queue::try_pop_strong
bool try_pop_strong(T &result)
Tries to pop an element from the queue as long as the queue is not empty.
Definition: vyukov_bounded_queue.hpp:171