xenium
nikolaev_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_NIKOLAEV_QUEUE_HPP
7 #define XENIUM_NIKOLAEV_QUEUE_HPP
8 
9 #include <xenium/parameter.hpp>
10 #include <xenium/policy.hpp>
11 #include <xenium/utils.hpp>
12 
13 #include <xenium/detail/nikolaev_scq.hpp>
14 
15 #include <atomic>
16 #include <cassert>
17 #include <cstdint>
18 #include <memory>
19 
20 namespace xenium {
47 template <class T, class... Policies>
49 public:
50  using value_type = T;
51  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
52  static constexpr unsigned pop_retries =
53  parameter::value_param_t<unsigned, policy::pop_retries, 1000, Policies...>::value;
54  static constexpr unsigned entries_per_node =
55  parameter::value_param_t<unsigned, policy::entries_per_node, 512, Policies...>::value;
56 
57  static_assert(utils::is_power_of_two(entries_per_node), "entries_per_node must be a power of two");
58  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
59 
60  template <class... NewPolicies>
61  using with = nikolaev_queue<T, NewPolicies..., Policies...>;
62 
64  ~nikolaev_queue();
65 
66  nikolaev_queue(const nikolaev_queue&) = delete;
67  nikolaev_queue(nikolaev_queue&&) = delete;
68 
69  nikolaev_queue& operator=(const nikolaev_queue&) = delete;
70  nikolaev_queue& operator=(nikolaev_queue&&) = delete;
71 
79  void push(value_type value);
80 
89  bool try_pop(value_type& result);
90 
91 private:
92  struct node;
93 
94  using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
95  using marked_ptr = typename concurrent_ptr::marked_ptr;
96  using guard_ptr = typename concurrent_ptr::guard_ptr;
97 
98  using storage_t = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
99 
100  static constexpr unsigned remap_shift = detail::nikolaev_scq::calc_remap_shift(entries_per_node);
101 
102  // TODO - preallocate memory for storage and queues together with node
103  struct node : reclaimer::template enable_concurrent_ptr<node> {
104  node() :
105  _storage(new storage_t[entries_per_node]),
106  _allocated_queue(entries_per_node, remap_shift, detail::nikolaev_scq::empty_tag{}),
107  _free_queue(entries_per_node, remap_shift, detail::nikolaev_scq::full_tag{}) {}
108 
109  explicit node(value_type&& value) :
110  _storage(new storage_t[entries_per_node]),
111  _allocated_queue(entries_per_node, remap_shift, detail::nikolaev_scq::first_used_tag{}),
112  _free_queue(entries_per_node, remap_shift, detail::nikolaev_scq::first_empty_tag{}) {
113  new (&_storage[0]) T(std::move(value));
114  }
115 
116  ~node() override {
117  std::uint64_t eidx;
118  while (_allocated_queue.dequeue<false, pop_retries>(eidx, entries_per_node, remap_shift)) {
119  reinterpret_cast<T&>(_storage[eidx]).~T();
120  }
121  }
122 
123  void steal_init_value(value_type& value) {
124  bool success = try_pop(value);
125  (void)success;
126  assert(success);
127  }
128 
129  bool try_push(value_type&& value) {
130  std::uint64_t eidx;
131  if (!_free_queue.dequeue<false, pop_retries>(eidx, entries_per_node, remap_shift)) {
132  _allocated_queue.finalize();
133  return false;
134  }
135 
136  assert(eidx < entries_per_node);
137  new (&_storage[eidx]) T(std::move(value));
138  if (!_allocated_queue.enqueue<false, true>(eidx, entries_per_node, remap_shift)) {
139  // queue has been finalized
140  // we have already moved the value, so we need to move it back and
141  // destroy the created storage item.
142  T& data = reinterpret_cast<T&>(_storage[eidx]);
143  value = std::move(data);
144  data.~T(); // NOLINT (use-after-move)
145  _free_queue.enqueue<false, false>(eidx, entries_per_node, remap_shift);
146  return false;
147  }
148  return true;
149  }
150 
151  bool try_pop(value_type& result) {
152  std::uint64_t eidx;
153  if (!_allocated_queue.dequeue<false, pop_retries>(eidx, entries_per_node, remap_shift)) {
154  return false;
155  }
156 
157  assert(eidx < entries_per_node);
158  T& data = reinterpret_cast<T&>(_storage[eidx]);
159  result = std::move(data);
160  data.~T(); // NOLINT (use-after-move)
161  _free_queue.enqueue<false, false>(eidx, entries_per_node, remap_shift);
162  return true;
163  }
164 
165  std::unique_ptr<storage_t[]> _storage;
166  detail::nikolaev_scq _allocated_queue;
167  detail::nikolaev_scq _free_queue;
168 
169  concurrent_ptr _next;
170  };
171 
172  concurrent_ptr _tail;
173  concurrent_ptr _head;
174 };
175 
176 template <class T, class... Policies>
178  auto n = new node();
179  _tail.store(n, std::memory_order_relaxed);
180  _head.store(n, std::memory_order_relaxed);
181 }
182 
183 template <class T, class... Policies>
184 nikolaev_queue<T, Policies...>::~nikolaev_queue() {
185  auto h = _head.load(std::memory_order_relaxed).get();
186  while (h != nullptr) {
187  auto next = h->_next.load(std::memory_order_relaxed).get();
188  delete h;
189  h = next;
190  }
191 }
192 
193 template <class T, class... Policies>
194 void nikolaev_queue<T, Policies...>::push(value_type value) {
195  guard_ptr n;
196  for (;;) {
197  // (1) - this acquire-load synchronizes-with the release-CAS (3, 5)
198  n.acquire(_tail, std::memory_order_acquire);
199  if (n->_next.load(std::memory_order_relaxed) != nullptr) {
200  // (2) - this acquire-load synchronizes-with the release-CAS (4)
201  const auto next = n->_next.load(std::memory_order_acquire);
202  marked_ptr expected = n;
203  // (3) - this release-CAS synchronizes with the acquire-load (1)
204  _tail.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed);
205  continue;
206  }
207 
208  if (n->try_push(std::move(value))) {
209  return;
210  }
211 
212  auto next = new node(std::move(value)); // NOLINT (use-after-move)
213  marked_ptr expected{nullptr};
214  // (4) - this release-CAS synchronizes-with the acquire-load (2, 7)
215  if (n->_next.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed)) {
216  expected = n;
217  // (5) - this release-CAS synchronizes-with the acquire-load (1)
218  _tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
219  return;
220  }
221  next->steal_init_value(value);
222  delete next;
223  }
224 }
225 
226 template <class T, class... Policies>
227 bool nikolaev_queue<T, Policies...>::try_pop(value_type& result) {
228  guard_ptr n;
229  for (;;) {
230  // (6) - this acquire-load synchronizes-with the release-CAS (8)
231  n.acquire(_head, std::memory_order_acquire);
232  if (n->try_pop(result)) {
233  return true;
234  }
235  if (n->_next.load(std::memory_order_relaxed) == nullptr) {
236  return false;
237  }
238 
239  n->_allocated_queue.set_threshold(3 * entries_per_node - 1);
240  if (n->try_pop(result)) {
241  return true;
242  }
243 
244  // (7) - this acquire-load synchronizes-with (4)
245  const auto next = n->_next.load(std::memory_order_acquire);
246  marked_ptr expected = n;
247  // (8) - this release-CAS synchronizes-with the acquire-load (6)
248  if (_head.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed)) {
249  n.reclaim();
250  }
251  }
252 }
253 } // namespace xenium
254 
255 #endif
xenium::nikolaev_queue::push
void push(value_type value)
Pushes the given value.
Definition: nikolaev_queue.hpp:194
xenium::policy::entries_per_node
Policy to configure the number of entries per allocated node in ramalhete_queue.
Definition: policy.hpp:103
xenium::nikolaev_queue::try_pop
bool try_pop(value_type &result)
Tries to pop an element from the queue.
Definition: nikolaev_queue.hpp:227
xenium::policy::pop_retries
Policy to configure the number of iterations to spin on a queue entry while waiting for a pending pus...
Definition: policy.hpp:124
xenium::policy::reclaimer
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25
xenium::nikolaev_queue
An unbounded lock-free multi-producer/multi-consumer queue.
Definition: nikolaev_queue.hpp:48