6 #ifndef XENIUM_KIRSCH_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_KFIFO_QUEUE_HPP
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
43 template <
class T,
class... Policies>
46 using traits = detail::pointer_queue_traits_t<T, Policies...>;
47 using raw_value_type =
typename traits::raw_type;
51 using reclaimer = parameter::type_param_t<
policy::reclaimer, parameter::nil, Policies...>;
52 static constexpr
unsigned padding_bytes =
53 parameter::value_param_t<unsigned,
policy::padding_bytes,
sizeof(raw_value_type), Policies...>::value;
55 static_assert(parameter::is_set<reclaimer>::value,
"reclaimer policy must be specified");
57 template <
class... NewPolicies>
77 void push(value_type value);
87 [[nodiscard]]
bool try_pop(value_type& result);
93 std::atomic<marked_value> value;
95 char padding[std::max(padding_bytes, 1u)];
98 struct unpadded_entry {
99 std::atomic<marked_value> value;
101 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
112 struct segment_deleter {
113 void operator()(segment* seg)
const { release_segment(seg); }
115 struct segment : reclaimer::template enable_concurrent_ptr<segment, 16, segment_deleter> {
116 using concurrent_ptr =
typename reclaimer::template concurrent_ptr<segment, 16>;
118 explicit segment(uint64_t k) : k(k) {}
119 ~segment()
override {
120 for (
unsigned i = 0; i < k; ++i) {
121 assert(items()[i].value.load(std::memory_order_relaxed).get() ==
nullptr);
125 void delete_remaining_items() {
126 for (
unsigned i = 0; i < k; ++i) {
127 traits::delete_value(items()[i].value.load(std::memory_order_relaxed).get());
128 items()[i].value.store(
nullptr, std::memory_order_relaxed);
132 entry* items() noexcept {
return reinterpret_cast<entry*
>(
this + 1); }
134 std::atomic<bool> deleted{
false};
136 concurrent_ptr next{};
139 using concurrent_ptr =
typename segment::concurrent_ptr;
140 using marked_ptr =
typename concurrent_ptr::marked_ptr;
141 using guard_ptr =
typename concurrent_ptr::guard_ptr;
143 segment* alloc_segment()
const;
144 static void release_segment(segment* seg);
146 template <
bool Empty>
147 bool find_index(marked_ptr segment, uint64_t& value_index, marked_value& old)
const noexcept;
148 void advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept;
149 void advance_tail(marked_ptr tail_current) noexcept;
150 bool committed(marked_ptr segment, marked_value value, uint64_t index) noexcept;
152 const std::size_t k_;
153 concurrent_ptr head_;
154 concurrent_ptr tail_;
157 template <
class T,
class... Policies>
158 kirsch_kfifo_queue<T, Policies...>::kirsch_kfifo_queue(uint64_t k) : k_(k) {
159 const auto seg = alloc_segment();
160 head_.store(seg, std::memory_order_relaxed);
161 tail_.store(seg, std::memory_order_relaxed);
164 template <
class T,
class... Policies>
165 kirsch_kfifo_queue<T, Policies...>::~kirsch_kfifo_queue() {
166 auto seg = head_.load(std::memory_order_relaxed).get();
168 auto next = seg->next.load(std::memory_order_relaxed).get();
169 seg->delete_remaining_items();
170 release_segment(seg);
175 template <
class T,
class... Policies>
176 auto kirsch_kfifo_queue<T, Policies...>::alloc_segment() const -> segment* {
177 void* data = ::operator
new(
sizeof(segment) + k_ *
sizeof(entry));
178 auto result =
new (data) segment(k_);
179 for (std::size_t i = 0; i < k_; ++i) {
180 new (&result->items()[i]) entry();
185 template <
class T,
class... Policies>
186 void kirsch_kfifo_queue<T, Policies...>::release_segment(segment* seg) {
188 ::operator
delete(seg);
191 template <
class T,
class... Policies>
193 if (value ==
nullptr) {
194 throw std::invalid_argument(
"value cannot be nullptr");
197 raw_value_type raw_value = traits::get_raw(value);
201 tail_old.acquire(tail_, std::memory_order_acquire);
207 bool found_idx = find_index<true>(tail_old, idx, old_value);
208 if (tail_old != tail_.load(std::memory_order_relaxed)) {
215 if (tail_old->items()[idx].value.compare_exchange_strong(
216 old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
217 committed(tail_old, new_value, idx)) {
218 traits::release(value);
223 advance_tail(tail_old);
228 template <
class T,
class... Policies>
233 head_old.acquire(head_, std::memory_order_acquire);
234 auto h = head_old.get();
238 bool found_idx = find_index<false>(head_old, idx, old_value);
239 if (head_old != head_.load(std::memory_order_relaxed)) {
244 marked_ptr tail_old = tail_.load(std::memory_order_acquire);
246 assert(old_value.
get() != (
void*)0x100);
247 if (head_old.get() == tail_old.get()) {
248 advance_tail(tail_old);
253 if (head_old->items()[idx].value.compare_exchange_strong(
254 old_value, new_value, std::memory_order_acquire, std::memory_order_relaxed)) {
255 traits::store(result, old_value.
get());
259 if (head_old.get() == tail_old.get() && tail_old == tail_.load(std::memory_order_relaxed)) {
262 advance_head(head_old, tail_old);
267 template <
class T,
class... Policies>
268 template <
bool Empty>
270 uint64_t& value_index,
271 marked_value& old)
const noexcept {
272 const uint64_t k = segment->k;
273 const uint64_t random_index = utils::random() % k;
274 for (
size_t i = 0; i < k; i++) {
275 uint64_t index = ((random_index + i) % k);
276 old = segment->items()[index].value.load(std::memory_order_relaxed);
277 if ((Empty && old.get() ==
nullptr) || (!Empty && old.get() !=
nullptr)) {
285 template <
class T,
class... Policies>
286 bool kirsch_kfifo_queue<T, Policies...>::committed(marked_ptr segment, marked_value value, uint64_t index) noexcept {
287 if (value != segment->items()[index].value.load(std::memory_order_relaxed)) {
291 const marked_value empty_value(
nullptr, value.mark() + 1);
293 if (segment->deleted.load(std::memory_order_relaxed)) {
295 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
299 marked_ptr head_current = head_.load(std::memory_order_acquire);
300 if (segment.get() == head_current.get()) {
302 marked_ptr new_head(head_current.get(), head_current.mark() + 1);
304 if (head_.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed)) {
311 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
314 if (!segment->deleted.load(std::memory_order_relaxed)) {
320 return !segment->items()[index].value.compare_exchange_strong(value, empty_value, std::memory_order_relaxed);
323 template <
class T,
class... Policies>
324 void kirsch_kfifo_queue<T, Policies...>::advance_head(guard_ptr& head_current, marked_ptr tail_current) noexcept {
326 const marked_ptr head_next_segment = head_current->next.load(std::memory_order_acquire);
327 if (head_current != head_.load(std::memory_order_relaxed)) {
331 if (head_current.get() == tail_current.get()) {
333 const marked_ptr tail_next_segment = tail_current->next.load(std::memory_order_acquire);
334 if (tail_next_segment.get() ==
nullptr) {
338 if (tail_current == tail_.load(std::memory_order_relaxed)) {
339 marked_ptr new_tail(tail_next_segment.get(), tail_current.mark() + 1);
341 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
345 head_current->deleted.store(
true, std::memory_order_relaxed);
347 marked_ptr expected = head_current;
348 marked_ptr new_head(head_next_segment.get(), head_current.mark() + 1);
350 if (head_.compare_exchange_strong(expected, new_head, std::memory_order_release, std::memory_order_relaxed)) {
351 head_current.reclaim();
355 template <
class T,
class... Policies>
356 void kirsch_kfifo_queue<T, Policies...>::advance_tail(marked_ptr tail_current) noexcept {
358 marked_ptr next_segment = tail_current->next.load(std::memory_order_acquire);
359 if (tail_current != tail_.load(std::memory_order_relaxed)) {
363 if (next_segment.get() !=
nullptr) {
364 marked_ptr new_tail(next_segment.get(), next_segment.mark() + 1);
366 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
368 auto seg = alloc_segment();
369 const marked_ptr new_segment(seg, next_segment.mark() + 1);
372 if (tail_current->next.compare_exchange_strong(
373 next_segment, new_segment, std::memory_order_release, std::memory_order_relaxed)) {
374 marked_ptr new_tail(seg, tail_current.mark() + 1);
376 tail_.compare_exchange_strong(tail_current, new_tail, std::memory_order_release, std::memory_order_relaxed);
378 release_segment(seg);