6 #ifndef XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
7 #define XENIUM_KIRSCH_BOUNDED_KFIFO_QUEUE_HPP
9 #include <xenium/marked_ptr.hpp>
10 #include <xenium/parameter.hpp>
11 #include <xenium/policy.hpp>
12 #include <xenium/utils.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
23 #pragma warning(disable : 26495) // uninitialized member variable
46 template <
class T,
class... Policies>
49 using traits = detail::pointer_queue_traits_t<T, Policies...>;
50 using raw_value_type =
typename traits::raw_type;
54 static constexpr
unsigned padding_bytes =
55 parameter::value_param_t<unsigned,
policy::padding_bytes,
sizeof(raw_value_type), Policies...>::value;
84 bool try_pop(value_type& result);
90 std::atomic<marked_value> value;
92 char padding[std::max(padding_bytes, 1U)];
95 struct unpadded_entry {
96 std::atomic<marked_value> value;
99 using entry = std::conditional_t<padding_bytes == 0, unpadded_entry, padded_entry>;
109 marked_idx() =
default;
110 marked_idx(uint64_t val, uint64_t mark) noexcept { _val = val | (mark << bits); }
112 [[nodiscard]] uint64_t get()
const noexcept {
return _val & val_mask; }
113 [[nodiscard]] uint64_t mark()
const noexcept {
return _val >> bits; }
114 bool operator==(
const marked_idx& other)
const noexcept {
return this->_val == other._val; }
115 bool operator!=(
const marked_idx& other)
const noexcept {
return this->_val != other._val; }
118 static constexpr
unsigned bits = 16;
119 static constexpr uint64_t val_mask = (
static_cast<uint64_t
>(1) << bits) - 1;
123 template <
bool Empty>
124 bool find_index(uint64_t start_index, uint64_t& index, marked_value& old);
125 bool queue_full(
const marked_idx& head_old,
const marked_idx& tail_old)
const;
126 bool segment_empty(
const marked_idx& head_old)
const;
127 [[nodiscard]]
bool not_in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
128 [[nodiscard]]
bool in_valid_region(uint64_t tail_old, uint64_t tail_current, uint64_t head_current)
const;
129 bool committed(
const marked_idx& tail_old, marked_value new_value, uint64_t index);
131 std::uint64_t _queue_size;
135 std::atomic<marked_idx> _head;
136 std::atomic<marked_idx> _tail;
137 std::unique_ptr<entry[]> _queue;
140 template <
class T,
class... Policies>
141 kirsch_bounded_kfifo_queue<T, Policies...>::kirsch_bounded_kfifo_queue(uint64_t k, uint64_t num_segments) :
142 _queue_size(k * num_segments),
146 _queue(new entry[k * num_segments]()) {}
148 template <
class T,
class... Policies>
149 kirsch_bounded_kfifo_queue<T, Policies...>::~kirsch_bounded_kfifo_queue() {
150 for (
unsigned i = 0; i < _queue_size; ++i) {
151 traits::delete_value(_queue[i].value.load(std::memory_order_relaxed).get());
155 template <
class T,
class... Policies>
157 if (value ==
nullptr) {
158 throw std::invalid_argument(
"value can not be nullptr");
161 raw_value_type raw_value = traits::get_raw(value);
163 marked_idx tail_old = _tail.load(std::memory_order_relaxed);
164 marked_idx head_old = _head.load(std::memory_order_relaxed);
168 bool found_idx = find_index<true>(tail_old.get(), idx, old_value);
169 if (tail_old != _tail.load(std::memory_order_relaxed)) {
174 assert(old_value.
get() ==
nullptr);
177 if (_queue[idx].value.compare_exchange_strong(
178 old_value, new_value, std::memory_order_release, std::memory_order_relaxed) &&
179 committed(tail_old, new_value, idx)) {
180 traits::release(value);
184 if (queue_full(head_old, tail_old)) {
185 if (segment_empty(head_old)) {
187 marked_idx new_head((head_old.get() + _k) % _queue_size, head_old.mark() + 1);
188 _head.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
189 }
else if (head_old == _head.load(std::memory_order_relaxed)) {
195 marked_idx new_tail((tail_old.get() + _k) % _queue_size, tail_old.mark() + 1);
196 _tail.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
201 template <
class T,
class... Policies>
204 marked_idx head_old = _head.load(std::memory_order_relaxed);
205 marked_idx tail_old = _tail.load(std::memory_order_relaxed);
209 bool found_idx = find_index<false>(head_old.get(), idx, old_value);
210 if (head_old != _head.load(std::memory_order_relaxed)) {
215 assert(old_value.
get() !=
nullptr);
216 if (head_old.get() == tail_old.get()) {
217 marked_idx new_tail((tail_old.get() + _k) % _queue_size, tail_old.mark() + 1);
218 _tail.compare_exchange_strong(tail_old, new_tail, std::memory_order_relaxed);
222 if (_queue[idx].value.compare_exchange_strong(
223 old_value, new_value, std::memory_order_release, std::memory_order_relaxed)) {
224 traits::store(result, old_value.
get());
228 if (head_old.get() == tail_old.get() && tail_old == _tail.load(std::memory_order_relaxed)) {
232 marked_idx new_head((head_old.get() + _k) % _queue_size, head_old.mark() + 1);
233 _head.compare_exchange_strong(head_old, new_head, std::memory_order_relaxed);
238 template <
class T,
class... Policies>
239 template <
bool Empty>
241 uint64_t& value_index,
243 const uint64_t random_index = utils::random() % _k;
244 for (
size_t i = 0; i < _k; i++) {
246 uint64_t index = (start_index + ((random_index + i) % _k)) % _queue_size;
248 old = _queue[index].value.load(std::memory_order_acquire);
249 if ((Empty && old.get() ==
nullptr) || (!Empty && old.get() !=
nullptr)) {
257 template <
class T,
class... Policies>
258 bool kirsch_bounded_kfifo_queue<T, Policies...>::committed(
const marked_idx& tail_old,
261 if (_queue[index].value.load(std::memory_order_relaxed) != value) {
265 marked_idx tail_current = _tail.load(std::memory_order_relaxed);
266 marked_idx head_current = _head.load(std::memory_order_relaxed);
267 if (in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
271 if (not_in_valid_region(tail_old.get(), tail_current.get(), head_current.get())) {
272 marked_value new_value(
nullptr, value.mark() + 1);
273 if (!_queue[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed)) {
277 marked_idx new_head(head_current.get(), head_current.mark() + 1);
278 if (_head.compare_exchange_strong(head_current, new_head, std::memory_order_relaxed)) {
282 marked_value new_value(
nullptr, value.mark() + 1);
283 if (!_queue[index].value.compare_exchange_strong(value, new_value, std::memory_order_relaxed)) {
290 template <
class T,
class... Policies>
291 bool kirsch_bounded_kfifo_queue<T, Policies...>::queue_full(
const marked_idx& head_old,
292 const marked_idx& tail_old)
const {
293 return (((tail_old.get() + _k) % _queue_size) == head_old.get() &&
294 (head_old == _head.load(std::memory_order_relaxed)));
297 template <
class T,
class... Policies>
298 bool kirsch_bounded_kfifo_queue<T, Policies...>::segment_empty(
const marked_idx& head_old)
const {
299 const uint64_t start = head_old.get();
300 for (
size_t i = 0; i < _k; i++) {
303 if (_queue[(start + i) % _queue_size].value.load(std::memory_order_acquire).get() !=
nullptr) {
310 template <
class T,
class... Policies>
311 bool kirsch_bounded_kfifo_queue<T, Policies...>::in_valid_region(uint64_t tail_old,
312 uint64_t tail_current,
313 uint64_t head_current)
const {
314 bool wrap_around = tail_current < head_current;
316 return head_current < tail_old && tail_old <= tail_current;
318 return head_current < tail_old || tail_old <= tail_current;
321 template <
class T,
class... Policies>
322 bool kirsch_bounded_kfifo_queue<T, Policies...>::not_in_valid_region(uint64_t tail_old,
323 uint64_t tail_current,
324 uint64_t head_current)
const {
325 bool wrap_around = tail_current < head_current;
327 return tail_old < tail_current || head_current < tail_old;
329 return tail_old < tail_current && head_current < tail_old;