6 #ifndef XENIUM_NIKOLAEV_QUEUE_HPP
7 #define XENIUM_NIKOLAEV_QUEUE_HPP
9 #include <xenium/parameter.hpp>
10 #include <xenium/policy.hpp>
11 #include <xenium/utils.hpp>
13 #include <xenium/detail/nikolaev_scq.hpp>
47 template <
class T,
class... Policies>
51 using reclaimer = parameter::type_param_t<
policy::reclaimer, parameter::nil, Policies...>;
52 static constexpr
unsigned pop_retries =
54 static constexpr
unsigned entries_per_node =
57 static_assert(utils::is_power_of_two(entries_per_node),
"entries_per_node must be a power of two");
58 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
60 template <
class... NewPolicies>
79 void push(value_type value);
89 bool try_pop(value_type& result);
94 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<node, 0>;
95 using marked_ptr =
typename concurrent_ptr::marked_ptr;
96 using guard_ptr =
typename concurrent_ptr::guard_ptr;
98 using storage_t =
typename std::aligned_storage<
sizeof(T),
alignof(T)>::type;
100 static constexpr
unsigned remap_shift = detail::nikolaev_scq::calc_remap_shift(entries_per_node);
103 struct node : reclaimer::template enable_concurrent_ptr<node> {
105 _storage(
new storage_t[entries_per_node]),
106 _allocated_queue(entries_per_node, remap_shift, detail::nikolaev_scq::empty_tag{}),
107 _free_queue(entries_per_node, remap_shift, detail::nikolaev_scq::full_tag{}) {}
109 explicit node(value_type&& value) :
110 _storage(
new storage_t[entries_per_node]),
111 _allocated_queue(entries_per_node, remap_shift, detail::nikolaev_scq::first_used_tag{}),
112 _free_queue(entries_per_node, remap_shift, detail::nikolaev_scq::first_empty_tag{}) {
113 new (&_storage[0]) T(std::move(value));
118 while (_allocated_queue.dequeue<
false, pop_retries>(eidx, entries_per_node, remap_shift)) {
119 reinterpret_cast<T&
>(_storage[eidx]).~T();
123 void steal_init_value(value_type& value) {
129 bool try_push(value_type&& value) {
131 if (!_free_queue.dequeue<
false, pop_retries>(eidx, entries_per_node, remap_shift)) {
132 _allocated_queue.finalize();
136 assert(eidx < entries_per_node);
137 new (&_storage[eidx]) T(std::move(value));
138 if (!_allocated_queue.enqueue<
false,
true>(eidx, entries_per_node, remap_shift)) {
142 T& data =
reinterpret_cast<T&
>(_storage[eidx]);
143 value = std::move(data);
145 _free_queue.enqueue<
false,
false>(eidx, entries_per_node, remap_shift);
151 bool try_pop(value_type& result) {
153 if (!_allocated_queue.dequeue<
false, pop_retries>(eidx, entries_per_node, remap_shift)) {
157 assert(eidx < entries_per_node);
158 T& data =
reinterpret_cast<T&
>(_storage[eidx]);
159 result = std::move(data);
161 _free_queue.enqueue<
false,
false>(eidx, entries_per_node, remap_shift);
165 std::unique_ptr<storage_t[]> _storage;
166 detail::nikolaev_scq _allocated_queue;
167 detail::nikolaev_scq _free_queue;
169 concurrent_ptr _next;
172 concurrent_ptr _tail;
173 concurrent_ptr _head;
176 template <
class T,
class... Policies>
179 _tail.store(n, std::memory_order_relaxed);
180 _head.store(n, std::memory_order_relaxed);
183 template <
class T,
class... Policies>
184 nikolaev_queue<T, Policies...>::~nikolaev_queue() {
185 auto h = _head.load(std::memory_order_relaxed).get();
186 while (h !=
nullptr) {
187 auto next = h->_next.load(std::memory_order_relaxed).get();
193 template <
class T,
class... Policies>
198 n.acquire(_tail, std::memory_order_acquire);
199 if (n->_next.load(std::memory_order_relaxed) !=
nullptr) {
201 const auto next = n->_next.load(std::memory_order_acquire);
202 marked_ptr expected = n;
204 _tail.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed);
208 if (n->try_push(std::move(value))) {
212 auto next =
new node(std::move(value));
213 marked_ptr expected{
nullptr};
215 if (n->_next.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed)) {
218 _tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
221 next->steal_init_value(value);
226 template <
class T,
class... Policies>
231 n.acquire(_head, std::memory_order_acquire);
232 if (n->try_pop(result)) {
235 if (n->_next.load(std::memory_order_relaxed) ==
nullptr) {
239 n->_allocated_queue.set_threshold(3 * entries_per_node - 1);
240 if (n->try_pop(result)) {
245 const auto next = n->_next.load(std::memory_order_acquire);
246 marked_ptr expected = n;
248 if (_head.compare_exchange_weak(expected, next, std::memory_order_release, std::memory_order_relaxed)) {