xenium
ramalhete_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_RAMALHETE_QUEUE_HPP
7 #define XENIUM_RAMALHETE_QUEUE_HPP
8 
9 #include <xenium/acquire_guard.hpp>
10 #include <xenium/backoff.hpp>
11 #include <xenium/detail/pointer_queue_traits.hpp>
12 #include <xenium/marked_ptr.hpp>
13 #include <xenium/parameter.hpp>
14 #include <xenium/policy.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <stdexcept>
19 
20 #ifdef _MSC_VER
21  #pragma warning(push)
22  #pragma warning(disable : 4324) // structure was padded due to alignment specifier
23 #endif
24 
25 namespace xenium {
26 
54 template <class T, class... Policies>
56 private:
57  using traits = detail::pointer_queue_traits_t<T, Policies...>;
58  using raw_value_type = typename traits::raw_type;
59 
60 public:
61  using value_type = T;
62  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
63  using backoff = parameter::type_param_t<policy::backoff, no_backoff, Policies...>;
64  static constexpr unsigned entries_per_node =
65  parameter::value_param_t<unsigned, policy::entries_per_node, 512, Policies...>::value;
66  static constexpr unsigned pop_retries =
67  parameter::value_param_t<unsigned, policy::pop_retries, 1000, Policies...>::value;
68 
69  static_assert(entries_per_node > 0, "entries_per_node must be greater than zero");
70  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
71 
72  template <class... NewPolicies>
73  using with = ramalhete_queue<T, NewPolicies..., Policies...>;
74 
76  ~ramalhete_queue();
77 
85  void push(value_type value);
86 
94  [[nodiscard]] bool try_pop(value_type& result);
95 
96 private:
97  struct node;
98 
99  using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
100  using marked_ptr = typename concurrent_ptr::marked_ptr;
101  using guard_ptr = typename concurrent_ptr::guard_ptr;
102 
103  // TODO - use type from traits
105 
106  struct entry {
107  std::atomic<marked_value> value;
108  };
109 
110  // TODO - make this configurable via policy.
111  static constexpr unsigned step_size = 11;
112  static constexpr unsigned max_idx = step_size * entries_per_node;
113 
114  struct node : reclaimer::template enable_concurrent_ptr<node> {
115  // pop_idx and push_idx are incremented by step_size to avoid false sharing, so the
116  // actual index has to be calculated modulo entries_per_node
117  std::atomic<unsigned> pop_idx;
118  entry entries[entries_per_node];
119  std::atomic<unsigned> push_idx;
120  concurrent_ptr next;
121 
122  // Start with the first entry pre-filled
123  explicit node(raw_value_type item) : pop_idx{0}, push_idx{step_size}, next{nullptr} {
124  entries[0].value.store(item, std::memory_order_relaxed);
125  for (unsigned i = 1; i < entries_per_node; i++) {
126  entries[i].value.store(nullptr, std::memory_order_relaxed);
127  }
128  }
129 
130  ~node() override {
131  for (unsigned i = pop_idx; i < push_idx; i += step_size) {
132  traits::delete_value(entries[i % entries_per_node].value.load(std::memory_order_relaxed).get());
133  }
134  }
135  };
136 
137  alignas(64) concurrent_ptr _head;
138  alignas(64) concurrent_ptr _tail;
139 };
140 
141 template <class T, class... Policies>
143  auto n = new node(nullptr);
144  n->push_idx.store(0, std::memory_order_relaxed);
145  _head.store(n, std::memory_order_relaxed);
146  _tail.store(n, std::memory_order_relaxed);
147 }
148 
149 template <class T, class... Policies>
150 ramalhete_queue<T, Policies...>::~ramalhete_queue() {
151  // (1) - this acquire-load synchronizes-with the release-CAS (13)
152  auto n = _head.load(std::memory_order_acquire);
153  while (n) {
154  // (2) - this acquire-load synchronizes-with the release-CAS (4)
155  auto next = n->next.load(std::memory_order_acquire);
156  delete n.get();
157  n = next;
158  }
159 }
160 
161 template <class T, class... Policies>
163  raw_value_type raw_val = traits::get_raw(value);
164  if (raw_val == nullptr) {
165  throw std::invalid_argument("value can not be nullptr");
166  }
167 
168  backoff backoff;
169  guard_ptr t;
170  for (;;) {
171  // (3) - this acquire-load synchronizes-with the release-CAS (5, 7)
172  t.acquire(_tail, std::memory_order_acquire);
173 
174  unsigned idx = t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
175  if (idx >= max_idx) {
176  // This node is full
177  if (t != _tail.load(std::memory_order_relaxed)) {
178  continue; // some other thread already added a new node.
179  }
180 
181  auto next = t->next.load(std::memory_order_relaxed);
182  if (next == nullptr) {
183  node* new_node = new node(raw_val);
184  traits::release(value);
185 
186  marked_ptr expected = nullptr;
187  // (4) - this release-CAS synchronizes-with the acquire-load (2, 6, 12)
188  if (t->next.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed)) {
189  expected = t;
190  // (5) - this release-CAS synchronizes-with the acquire-load (3)
191  _tail.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed);
192  return;
193  }
194  // prevent the pre-stored value from beeing deleted
195  new_node->push_idx.store(0, std::memory_order_relaxed);
196  // some other node already added a new node
197  delete new_node;
198  } else {
199  // (6) - this acquire-load synchronizes-with the release-CAS (4)
200  next = t->next.load(std::memory_order_acquire);
201  marked_ptr expected = t;
202  // (7) - this release-CAS synchronizes-with the acquire-load (3)
203  _tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
204  }
205  continue;
206  }
207  idx %= entries_per_node;
208 
209  marked_value expected = nullptr;
210  // (8) - this release-CAS synchronizes-with the acquire-load (14) and the acquire-exchange (15)
211  if (t->entries[idx].value.compare_exchange_strong(
212  expected, raw_val, std::memory_order_release, std::memory_order_relaxed)) {
213  traits::release(value);
214  return;
215  }
216 
217  backoff();
218  }
219 }
220 
221 template <class T, class... Policies>
223  backoff backoff;
224 
225  guard_ptr h;
226  for (;;) {
227  // (9) - this acquire-load synchronizes-with the release-CAS (13)
228  h.acquire(_head, std::memory_order_acquire);
229 
230  // (10) - this acquire-load synchronizes-with the release-fetch-add (11)
231  const auto pop_idx = h->pop_idx.load(std::memory_order_acquire);
232  // This synchronization is necessary to avoid a situation where we see an up-to-date
233  // pop_idx, but an out-of-date push_idx and would (falsly) assume that the queue is empty.
234  const auto push_idx = h->push_idx.load(std::memory_order_relaxed);
235  if (pop_idx >= push_idx && h->next.load(std::memory_order_relaxed) == nullptr) {
236  break;
237  }
238 
239  // (11) - this release-fetch-add synchronizes with the acquire-load (10)
240  unsigned idx = h->pop_idx.fetch_add(step_size, std::memory_order_release);
241  if (idx >= max_idx) {
242  // This node has been drained, check if there is another one
243  // (12) - this acquire-load synchronizes-with the release-CAS (4)
244  auto next = h->next.load(std::memory_order_acquire);
245  if (next == nullptr) {
246  break; // No more nodes in the queue
247  }
248 
249  marked_ptr expected = h;
250  // (13) - this release-CAS synchronizes-with the acquire-load (1, 9)
251  if (_head.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed)) {
252  h.reclaim(); // The old node has been unlinked -> reclaim it.
253  }
254 
255  continue;
256  }
257  idx %= entries_per_node;
258 
259  auto value = h->entries[idx].value.load(std::memory_order_relaxed);
260  if constexpr (pop_retries > 0) {
261  unsigned cnt = 0;
262  ramalhete_queue::backoff retry_backoff;
263  while (value == nullptr && ++cnt <= pop_retries) {
264  value = h->entries[idx].value.load(std::memory_order_relaxed);
265  retry_backoff(); // TODO - use a backoff type that can be configured separately
266  }
267  }
268 
269  if (value != nullptr) {
270  // (14) - this acquire-load synchronizes-with the release-CAS (8)
271  std::ignore = h->entries[idx].value.load(std::memory_order_acquire);
272  traits::store(result, value.get());
273  return true;
274  }
275 
276  // (15) - this acquire-exchange synchronizes-with the release-CAS (8)
277  value = h->entries[idx].value.exchange(marked_value(nullptr, 1), std::memory_order_acquire);
278  if (value != nullptr) {
279  traits::store(result, value.get());
280  return true;
281  }
282 
283  backoff();
284  }
285 
286  return false;
287 }
288 } // namespace xenium
289 
290 #ifdef _MSC_VER
291  #pragma warning(pop)
292 #endif
293 
294 #endif
xenium::marked_ptr
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
xenium::policy::backoff
Policy to configure the backoff strategy.
Definition: policy.hpp:39
xenium::ramalhete_queue::push
void push(value_type value)
Pushes the given value to the queue.
Definition: ramalhete_queue.hpp:162
xenium::policy::entries_per_node
Policy to configure the number of entries per allocated node in ramalhete_queue.
Definition: policy.hpp:103
xenium::ramalhete_queue::try_pop
bool try_pop(value_type &result)
Tries to pop an object from the queue.
Definition: ramalhete_queue.hpp:222
xenium::policy::pop_retries
Policy to configure the number of iterations to spin on a queue entry while waiting for a pending pus...
Definition: policy.hpp:124
xenium::policy::reclaimer
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25
xenium::no_backoff
Dummy backoff strategy that does nothing.
Definition: backoff.hpp:16
xenium::ramalhete_queue
A fast unbounded lock-free multi-producer/multi-consumer FIFO queue.
Definition: ramalhete_queue.hpp:55