xenium
ramalhete_queue.hpp
1 //
2 // Copyright (c) 2018-2020 Manuel Pöter.
3 // Licensed under the MIT License. See LICENSE file in the project root for full license information.
4 //
5 
6 #ifndef XENIUM_RAMALHETE_QUEUE_HPP
7 #define XENIUM_RAMALHETE_QUEUE_HPP
8 
9 #include <xenium/acquire_guard.hpp>
10 #include <xenium/backoff.hpp>
11 #include <xenium/marked_ptr.hpp>
12 #include <xenium/parameter.hpp>
13 #include <xenium/policy.hpp>
14 #include <xenium/detail/pointer_queue_traits.hpp>
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <stdexcept>
19 
20 #ifdef _MSC_VER
21 #pragma warning(push)
22 #pragma warning(disable: 4324) // structure was padded due to alignment specifier
23 #endif
24 
25 namespace xenium {
26 
27 namespace policy {
33  template <unsigned Value>
34  struct pop_retries;
35 }
36 
64 template <class T, class... Policies>
66 private:
67  using traits = detail::pointer_queue_traits_t<T, Policies...>;
68  using raw_value_type = typename traits::raw_type;
69 public:
70  using value_type = T;
71  using reclaimer = parameter::type_param_t<policy::reclaimer, parameter::nil, Policies...>;
72  using backoff = parameter::type_param_t<policy::backoff, no_backoff, Policies...>;
73  static constexpr unsigned entries_per_node = parameter::value_param_t<unsigned, policy::entries_per_node, 512, Policies...>::value;
74  static constexpr unsigned pop_retries = parameter::value_param_t<unsigned, policy::pop_retries, 1000, Policies...>::value;;
75 
76  static_assert(entries_per_node > 0, "entries_per_node must be greater than zero");
77  static_assert(parameter::is_set<reclaimer>::value, "reclaimer policy must be specified");
78 
79  template <class... NewPolicies>
80  using with = ramalhete_queue<T, NewPolicies..., Policies...>;
81 
83  ~ramalhete_queue();
84 
92  void push(value_type value);
93 
101  [[nodiscard]] bool try_pop(value_type &result);
102 
103 private:
104  struct node;
105 
106  using concurrent_ptr = typename reclaimer::template concurrent_ptr<node, 0>;
107  using marked_ptr = typename concurrent_ptr::marked_ptr;
108  using guard_ptr = typename concurrent_ptr::guard_ptr;
109 
110  // TODO - use type from traits
112 
113  struct entry {
114  std::atomic<marked_value> value;
115  };
116 
117  // TODO - make this configurable via policy.
118  static constexpr unsigned step_size = 11;
119  static constexpr unsigned max_idx = step_size * entries_per_node;
120 
121  struct node : reclaimer::template enable_concurrent_ptr<node> {
122  // pop_idx and push_idx are incremented by step_size to avoid false sharing, so the
123  // actual index has to be calculated modulo entries_per_node
124  std::atomic<unsigned> pop_idx;
125  entry entries[entries_per_node];
126  std::atomic<unsigned> push_idx;
127  concurrent_ptr next;
128 
129  // Start with the first entry pre-filled
130  node(raw_value_type item) :
131  pop_idx{0},
132  push_idx{step_size},
133  next{nullptr}
134  {
135  entries[0].value.store(item, std::memory_order_relaxed);
136  for (unsigned i = 1; i < entries_per_node; i++)
137  entries[i].value.store(nullptr, std::memory_order_relaxed);
138  }
139 
140  ~node() {
141  for (unsigned i = pop_idx; i < push_idx; i += step_size) {
142  traits::delete_value(entries[i % entries_per_node].value.load(std::memory_order_relaxed).get());
143  }
144  }
145  };
146 
147  alignas(64) concurrent_ptr head;
148  alignas(64) concurrent_ptr tail;
149 };
150 
151 template <class T, class... Policies>
153 {
154  auto n = new node(nullptr);
155  n->push_idx.store(0, std::memory_order_relaxed);
156  head.store(n, std::memory_order_relaxed);
157  tail.store(n, std::memory_order_relaxed);
158 }
159 
160 template <class T, class... Policies>
161 ramalhete_queue<T, Policies...>::~ramalhete_queue()
162 {
163  // (1) - this acquire-load synchronizes-with the release-CAS (13)
164  auto n = head.load(std::memory_order_acquire);
165  while (n)
166  {
167  // (2) - this acquire-load synchronizes-with the release-CAS (4)
168  auto next = n->next.load(std::memory_order_acquire);
169  delete n.get();
170  n = next;
171  }
172 }
173 
174 template <class T, class... Policies>
176 {
177  raw_value_type raw_val = traits::get_raw(value);
178  if (raw_val == nullptr)
179  throw std::invalid_argument("value can not be nullptr");
180 
181  backoff backoff;
182  guard_ptr t;
183  for (;;) {
184  // (3) - this acquire-load synchronizes-with the release-CAS (5, 7)
185  t.acquire(tail, std::memory_order_acquire);
186 
187  unsigned idx = t->push_idx.fetch_add(step_size, std::memory_order_relaxed);
188  if (idx >= max_idx) {
189  // This node is full
190  if (t != tail.load(std::memory_order_relaxed))
191  continue; // some other thread already added a new node.
192 
193  auto next = t->next.load(std::memory_order_relaxed);
194  if (next == nullptr)
195  {
196  node* new_node = new node(raw_val);
197  traits::release(value);
198 
199  marked_ptr expected = nullptr;
200  // (4) - this release-CAS synchronizes-with the acquire-load (2, 6, 12)
201  if (t->next.compare_exchange_strong(expected, new_node,
202  std::memory_order_release,
203  std::memory_order_relaxed))
204  {
205  expected = t;
206  // (5) - this release-CAS synchronizes-with the acquire-load (3)
207  tail.compare_exchange_strong(expected, new_node, std::memory_order_release, std::memory_order_relaxed);
208  return;
209  }
210  // prevent the pre-stored value from beeing deleted
211  new_node->push_idx.store(0, std::memory_order_relaxed);
212  // some other node already added a new node
213  delete new_node;
214  } else {
215  // (6) - this acquire-load synchronizes-with the release-CAS (4)
216  next = t->next.load(std::memory_order_acquire);
217  marked_ptr expected = t;
218  // (7) - this release-CAS synchronizes-with the acquire-load (3)
219  tail.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed);
220  }
221  continue;
222  }
223  idx %= entries_per_node;
224 
225  marked_value expected = nullptr;
226  // (8) - this release-CAS synchronizes-with the acquire-load (14) and the acquire-exchange (15)
227  if (t->entries[idx].value.compare_exchange_strong(expected, raw_val, std::memory_order_release, std::memory_order_relaxed)) {
228  traits::release(value);
229  return;
230  }
231 
232  backoff();
233  }
234 }
235 
236 template <class T, class... Policies>
238 {
239  backoff backoff;
240 
241  guard_ptr h;
242  for (;;) {
243  // (9) - this acquire-load synchronizes-with the release-CAS (13)
244  h.acquire(head, std::memory_order_acquire);
245 
246  // (10) - this acquire-load synchronizes-with the release-fetch-add (11)
247  const auto pop_idx = h->pop_idx.load(std::memory_order_acquire);
248  // This synchronization is necessary to avoid a situation where we see an up-to-date
249  // pop_idx, but an out-of-date push_idx and would (falsly) assume that the queue is empty.
250  const auto push_idx = h->push_idx.load(std::memory_order_relaxed);
251  if (pop_idx >= push_idx &&
252  h->next.load(std::memory_order_relaxed) == nullptr)
253  break;
254 
255  // (11) - this release-fetch-add synchronizes with the acquire-load (10)
256  unsigned idx = h->pop_idx.fetch_add(step_size, std::memory_order_release);
257  if (idx >= max_idx) {
258  // This node has been drained, check if there is another one
259  // (12) - this acquire-load synchronizes-with the release-CAS (4)
260  auto next = h->next.load(std::memory_order_acquire);
261  if (next == nullptr)
262  break; // No more nodes in the queue
263 
264  marked_ptr expected = h;
265  // (13) - this release-CAS synchronizes-with the acquire-load (1, 9)
266  if (head.compare_exchange_strong(expected, next, std::memory_order_release, std::memory_order_relaxed))
267  h.reclaim(); // The old node has been unlinked -> reclaim it.
268 
269  continue;
270  }
271  idx %= entries_per_node;
272 
273  auto value = h->entries[idx].value.load(std::memory_order_relaxed);
274  if constexpr(pop_retries > 0) {
275  unsigned cnt = 0;
276  ramalhete_queue::backoff retry_backoff;
277  while (value == nullptr && ++cnt <= pop_retries) {
278  value = h->entries[idx].value.load(std::memory_order_relaxed);
279  retry_backoff(); // TODO - use a backoff type that can be configured separately
280  }
281  }
282 
283  if (value != nullptr) {
284  // (14) - this acquire-load synchronizes-with the release-CAS (8)
285  h->entries[idx].value.load(std::memory_order_acquire);
286  traits::store(result, value.get());
287  return true;
288  } else {
289  // (15) - this acquire-exchange synchronizes-with the release-CAS (8)
290  auto value = h->entries[idx].value.exchange(marked_value(nullptr, 1), std::memory_order_acquire);
291  if (value != nullptr) {
292  traits::store(result, value.get());
293  return true;
294  }
295  }
296 
297  backoff();
298  }
299 
300  return false;
301 }
302 }
303 
304 #ifdef _MSC_VER
305 #pragma warning(pop)
306 #endif
307 
308 #endif
A pointer with an embedded mark/tag value.
Definition: marked_ptr.hpp:41
A fast unbounded lock-free multi-producer/multi-consumer FIFO queue.
Definition: ramalhete_queue.hpp:65
void push(value_type value)
Pushes the given value to the queue.
Definition: ramalhete_queue.hpp:175
bool try_pop(value_type &result)
Tries to pop an object from the queue.
Definition: ramalhete_queue.hpp:237
Dummy backoff strategy that does nothing.
Definition: backoff.hpp:17
Policy to configure the backoff strategy.
Definition: policy.hpp:39
Policy to configure the number of entries per allocated node in ramalhete_queue.
Definition: policy.hpp:104
Policy to configure the number of iterations to spin on a queue entry while waiting for a pending pus...
Definition: ramalhete_queue.hpp:34
Policy to configure the reclamation scheme to be used.
Definition: policy.hpp:25