6 #ifndef XENIUM_DETAIL_NIKOLAEV_SCQ_HPP
7 #define XENIUM_DETAIL_NIKOLAEV_SCQ_HPP
9 #include "xenium/utils.hpp"
15 namespace xenium::detail {
20 struct first_used_tag {};
21 struct first_empty_tag {};
23 nikolaev_scq(std::size_t capacity, std::size_t remap_shift, empty_tag);
24 nikolaev_scq(std::size_t capacity, std::size_t remap_shift, full_tag);
25 nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_used_tag);
26 nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_empty_tag);
28 template <
bool Nonempty,
bool Finalizable>
29 bool enqueue(std::uint64_t value, std::size_t capacity, std::size_t remap_shift);
30 template <
bool Nonempty, std::
size_t PopRetries>
31 bool dequeue(std::uint64_t& value, std::size_t capacity, std::size_t remap_shift);
33 void finalize() { _tail.fetch_or(1, std::memory_order_relaxed); }
34 void set_threshold(std::int64_t v) { _threshold.store(v, std::memory_order_relaxed); }
36 static constexpr std::size_t calc_remap_shift(std::size_t capacity) {
37 assert(utils::is_power_of_two(capacity));
38 return utils::find_last_bit_set(capacity / indexes_per_cacheline);
42 using index_t = std::uint64_t;
43 using indexdiff_t = std::int64_t;
44 using value_t = std::uint64_t;
46 static constexpr std::size_t cacheline_size = 64;
47 static constexpr std::size_t indexes_per_cacheline = cacheline_size /
sizeof(index_t);
49 void catchup(std::uint64_t tail, std::uint64_t head);
51 static inline indexdiff_t diff(index_t a, index_t b) {
return static_cast<indexdiff_t
>(a - b); }
53 static inline index_t remap_index(index_t idx, std::size_t remap_shift, std::size_t n) {
54 assert(remap_shift == 0 || (1 << remap_shift) * indexes_per_cacheline == n);
56 return ((idx & (n - 1)) >> remap_shift) | ((idx * indexes_per_cacheline) & (n - 1));
64 std::atomic<index_t> _head;
65 alignas(64) std::atomic<std::int64_t> _threshold;
66 alignas(64) std::atomic<index_t> _tail;
67 alignas(64) std::unique_ptr<std::atomic<std::uint64_t>[]> _data;
70 static constexpr index_t finalized = 1;
71 static constexpr index_t index_inc = 2;
74 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, empty_tag) :
78 _data(new std::atomic<index_t>[capacity * 2]) {
79 const auto n = capacity * 2;
80 for (std::size_t i = 0; i < n; ++i) {
81 _data[remap_index(i << 1, remap_shift, n)].store(
static_cast<index_t
>(-1), std::memory_order_relaxed);
85 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, full_tag) :
87 _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
88 _tail(capacity * index_inc),
89 _data(new std::atomic<index_t>[capacity * 2]) {
90 const auto n = capacity * 2;
91 for (std::size_t i = 0; i < capacity; ++i) {
92 _data[remap_index(i << 1, remap_shift, n)].store(n + i, std::memory_order_relaxed);
94 for (std::size_t i = capacity; i < n; ++i) {
95 _data[remap_index(i << 1, remap_shift, n)].store(
static_cast<index_t
>(-1), std::memory_order_relaxed);
99 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_used_tag) :
101 _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
103 _data(new std::atomic<index_t>[capacity * 2]) {
104 const auto n = capacity * 2;
105 _data[remap_index(0, remap_shift, n)].store(n, std::memory_order_relaxed);
106 for (std::size_t i = 1; i < n; ++i) {
107 _data[remap_index(i << 1, remap_shift, n)].store(
static_cast<index_t
>(-1), std::memory_order_relaxed);
111 inline nikolaev_scq::nikolaev_scq(std::size_t capacity, std::size_t remap_shift, first_empty_tag) :
113 _threshold(static_cast<std::int64_t>(capacity) * 3 - 1),
114 _tail(capacity * index_inc),
115 _data(new std::atomic<index_t>[capacity * 2]) {
116 const auto n = capacity * 2;
117 _data[remap_index(0, remap_shift, n)].store(
static_cast<index_t
>(-1), std::memory_order_relaxed);
118 for (std::size_t i = 1; i < capacity; ++i) {
119 _data[remap_index(i << 1, remap_shift, n)].store(n + i, std::memory_order_relaxed);
121 for (std::size_t i = capacity; i < n; ++i) {
122 _data[remap_index(i << 1, remap_shift, n)].store(
static_cast<index_t
>(-1), std::memory_order_relaxed);
126 template <
bool Nonempty,
bool Finalizable>
127 inline bool nikolaev_scq::enqueue(std::uint64_t value, std::size_t capacity, std::size_t remap_shift) {
128 assert(value < capacity);
129 const std::size_t n = capacity * 2;
130 const std::size_t is_safe_and_value_mask = 2 * n - 1;
132 value ^= is_safe_and_value_mask;
135 auto tail = _tail.fetch_add(index_inc, std::memory_order_relaxed);
136 if constexpr (Finalizable) {
137 if (tail & finalized) {
141 if (tail & finalized) {
142 assert((tail & finalized) == 0);
144 const auto tail_cycle = tail | is_safe_and_value_mask;
145 const auto tidx = remap_index(tail, remap_shift, n);
147 auto entry = _data[tidx].load(std::memory_order_acquire);
150 const auto entry_cycle = entry | is_safe_and_value_mask;
151 if (diff(entry_cycle, tail_cycle) < 0 &&
152 (entry == entry_cycle ||
153 (entry == (entry_cycle ^ n) && diff(_head.load(std::memory_order_relaxed), tail) <= 0))) {
155 if (!_data[tidx].compare_exchange_weak(
156 entry, tail_cycle ^ value, std::memory_order_release, std::memory_order_relaxed)) {
160 const auto threshold =
static_cast<std::int64_t
>(n + capacity - 1);
161 if constexpr (!Nonempty) {
162 if (_threshold.load(std::memory_order_relaxed) != threshold) {
163 _threshold.store(threshold, std::memory_order_relaxed);
172 template <
bool Nonempty, std::
size_t PopRetries>
173 inline bool nikolaev_scq::dequeue(std::uint64_t& value, std::size_t capacity, std::size_t remap_shift) {
174 if constexpr (!Nonempty) {
175 if (_threshold.load(std::memory_order_relaxed) < 0) {
180 const std::size_t n = capacity * 2;
181 const std::size_t value_mask = n - 1;
182 const std::size_t is_safe_and_value_mask = 2 * n - 1;
185 const auto head = _head.fetch_add(index_inc, std::memory_order_relaxed);
186 assert((head & finalized) == 0);
187 const auto head_cycle = head | is_safe_and_value_mask;
188 const auto hidx = remap_index(head, remap_shift, n);
189 std::size_t attempt = 0;
190 std::uint64_t entry_cycle;
191 std::uint64_t entry_new;
195 auto entry = _data[hidx].load(std::memory_order_acquire);
197 entry_cycle = entry | is_safe_and_value_mask;
198 if (entry_cycle == head_cycle) {
200 _data[hidx].fetch_or(value_mask, std::memory_order_release);
201 value = entry & value_mask;
202 assert(value < capacity);
206 if ((entry | n) != entry_cycle) {
207 entry_new = entry & ~n;
208 if (entry == entry_new) {
212 auto tail = _tail.load(std::memory_order_relaxed);
213 if (diff(tail, head + index_inc) > 0 && ++attempt <= PopRetries) {
216 assert((head_cycle & is_safe_and_value_mask) == is_safe_and_value_mask);
217 entry_new = head_cycle;
220 diff(entry_cycle, head_cycle) < 0 &&
225 !_data[hidx].compare_exchange_weak(entry, entry_new, std::memory_order_acq_rel, std::memory_order_acquire));
227 if constexpr (!Nonempty) {
228 auto tail = _tail.load(std::memory_order_relaxed);
229 if (diff(tail, head + index_inc) <= 0) {
230 catchup(tail, head + index_inc);
231 _threshold.fetch_sub(1, std::memory_order_relaxed);
235 if (_threshold.fetch_sub(1, std::memory_order_relaxed) <= 0) {
242 inline void nikolaev_scq::catchup(std::uint64_t tail, std::uint64_t head) {
243 while (!_tail.compare_exchange_weak(tail, head, std::memory_order_relaxed)) {
244 head = _head.load(std::memory_order_relaxed);
245 if (diff(tail, head) >= 0) {