6 #ifndef XENIUM_RAMALHETE_QUEUE_HPP
7 #define XENIUM_RAMALHETE_QUEUE_HPP
9 #include <xenium/acquire_guard.hpp>
10 #include <xenium/backoff.hpp>
11 #include <xenium/detail/pointer_queue_traits.hpp>
12 #include <xenium/marked_ptr.hpp>
13 #include <xenium/parameter.hpp>
14 #include <xenium/policy.hpp>
22 #pragma warning(disable : 4324) // structure was padded due to alignment specifier
54 template <
class T,
class... Policies>
57 using traits = detail::pointer_queue_traits_t<T, Policies...>;
58 using raw_value_type =
typename traits::raw_type;
62 using reclaimer = parameter::type_param_t<
policy::reclaimer, parameter::nil, Policies...>;
64 static constexpr
unsigned entries_per_node =
66 static constexpr
unsigned pop_retries =
69 static_assert(entries_per_node > 0,
"entries_per_node must be greater than zero");
70 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
72 template <
class... NewPolicies>
85 void push(value_type value);
94 [[nodiscard]]
bool try_pop(value_type& result);
99 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<node, 0>;
100 using marked_ptr =
typename concurrent_ptr::marked_ptr;
101 using guard_ptr =
typename concurrent_ptr::guard_ptr;
107 std::atomic<marked_value> value;
111 static constexpr
unsigned step_size = 11;
112 static constexpr
unsigned max_idx = step_size * entries_per_node;
114 struct node : reclaimer::template enable_concurrent_ptr<node> {
117 std::atomic<unsigned> pop_idx;
118 entry entries[entries_per_node];
119 std::atomic<unsigned> push_idx;
123 explicit node(raw_value_type item) : pop_idx{0}, push_idx{step_size}, next{
nullptr} {
124 entries[0].value.store(item, std::memory_order_relaxed);
125 for (
unsigned i = 1; i < entries_per_node; i++) {
126 entries[i].value.store(
nullptr, std::memory_order_relaxed);
131 for (
unsigned i = pop_idx; i < push_idx; i += step_size) {
132 traits::delete_value(entries[i % entries_per_node].value.load(std::memory_order_relaxed).get());
137 alignas(64) concurrent_ptr _head;
138 alignas(64) concurrent_ptr _tail;
141 template <
class T,
class... Policies>
143 auto n =
new node(
nullptr);
144 n->push_idx.store(0, std::memory_order_relaxed);
145 _head.store(n, std::memory_order_relaxed);
146 _tail.store(n, std::memory_order_relaxed);
149 template <
class T,
class... Policies>
150 ramalhete_queue<T, Policies...>::~ramalhete_queue() {
152 auto n = _head.load(std::memory_order_acquire);
155 auto next = n->next.load(std::memory_order_acquire);
161 template <
class T,
class... Policies>
163 raw_value_type raw_val = traits::get_raw(value);
164 if (raw_val ==
nullptr) {
165 throw std::invalid_argument(
"value can not be nullptr");
172 t.acquire(_tail, std::memory_order_acquire);
174 unsigned idx = t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
175 if (idx >= max_idx) {
177 if (t != _tail.load(std::memory_order_relaxed)) {
181 auto next = t->next.load(std::memory_order_relaxed);
182 if (next ==
nullptr) {
183 node* new_node =
new node(raw_val);
184 traits::release(value);
186 marked_ptr expected =
nullptr;
188 if (t->next.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed)) {
191 _tail.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed);
195 new_node->push_idx.store(0, std::memory_order_relaxed);
200 next = t->next.load(std::memory_order_acquire);
201 marked_ptr expected = t;
203 _tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
207 idx %= entries_per_node;
211 if (t->entries[idx].value.compare_exchange_strong(
212 expected, raw_val, std::memory_order_release, std::memory_order_relaxed)) {
213 traits::release(value);
221 template <
class T,
class... Policies>
228 h.acquire(_head, std::memory_order_acquire);
231 const auto pop_idx = h->pop_idx.load(std::memory_order_acquire);
234 const auto push_idx = h->push_idx.load(std::memory_order_relaxed);
235 if (pop_idx >= push_idx && h->next.load(std::memory_order_relaxed) ==
nullptr) {
240 unsigned idx = h->pop_idx.fetch_add(step_size, std::memory_order_release);
241 if (idx >= max_idx) {
244 auto next = h->next.load(std::memory_order_acquire);
245 if (next ==
nullptr) {
249 marked_ptr expected = h;
251 if (_head.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed)) {
257 idx %= entries_per_node;
259 auto value = h->entries[idx].value.load(std::memory_order_relaxed);
260 if constexpr (pop_retries > 0) {
262 ramalhete_queue::backoff retry_backoff;
263 while (value ==
nullptr && ++cnt <= pop_retries) {
264 value = h->entries[idx].value.load(std::memory_order_relaxed);
269 if (value !=
nullptr) {
271 std::ignore = h->entries[idx].value.load(std::memory_order_acquire);
272 traits::store(result, value.get());
277 value = h->entries[idx].value.exchange(
marked_value(
nullptr, 1), std::memory_order_acquire);
278 if (value !=
nullptr) {
279 traits::store(result, value.get());