Disruptor pattern in a lock-free producer-consumer queue
$begingroup$
I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:
- Data to be shared between threads is stored in a ring buffer.
- The ring buffer maintains a sequence number that points to the next available slot.
- Consumer threads maintain their own sequence numbers indicating their current read position.
- Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).
- Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.
- Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.
I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?
template <class T, std::size_t n>
class mpmc_ring_buffer {
static constexpr auto length = next_power_of_2(n);
static constexpr auto mask = length - 1;
static constexpr auto shift = log2(length);
public:
ring_buffer () {
for (std::size_t i = 0; i < length; ++i)
published[i] = -1;
}
auto push (T && t) {
auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
// TODO: Check for wraparound and spin while consumers catch up
data[seq & mask] = std::forward<T &&>(t);
published[seq & mask].store(seq >> shift, std::memory_order_release);
return seq;
}
T peek (std::size_t seq) {
while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
return data[seq & mask];
}
private:
std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
T data alignas(CACHE_LINE_SIZE) [length];
std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
};
c++ circular-list c++14 lock-free producer-consumer
$endgroup$
This question has an open bounty worth +50
reputation from alex ending in 7 days.
This question has not received enough attention.
add a comment |
$begingroup$
I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:
- Data to be shared between threads is stored in a ring buffer.
- The ring buffer maintains a sequence number that points to the next available slot.
- Consumer threads maintain their own sequence numbers indicating their current read position.
- Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).
- Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.
- Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.
I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?
template <class T, std::size_t n>
class mpmc_ring_buffer {
static constexpr auto length = next_power_of_2(n);
static constexpr auto mask = length - 1;
static constexpr auto shift = log2(length);
public:
ring_buffer () {
for (std::size_t i = 0; i < length; ++i)
published[i] = -1;
}
auto push (T && t) {
auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
// TODO: Check for wraparound and spin while consumers catch up
data[seq & mask] = std::forward<T &&>(t);
published[seq & mask].store(seq >> shift, std::memory_order_release);
return seq;
}
T peek (std::size_t seq) {
while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
return data[seq & mask];
}
private:
std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
T data alignas(CACHE_LINE_SIZE) [length];
std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
};
c++ circular-list c++14 lock-free producer-consumer
$endgroup$
This question has an open bounty worth +50
reputation from alex ending in 7 days.
This question has not received enough attention.
add a comment |
$begingroup$
I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:
- Data to be shared between threads is stored in a ring buffer.
- The ring buffer maintains a sequence number that points to the next available slot.
- Consumer threads maintain their own sequence numbers indicating their current read position.
- Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).
- Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.
- Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.
I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?
template <class T, std::size_t n>
class mpmc_ring_buffer {
static constexpr auto length = next_power_of_2(n);
static constexpr auto mask = length - 1;
static constexpr auto shift = log2(length);
public:
ring_buffer () {
for (std::size_t i = 0; i < length; ++i)
published[i] = -1;
}
auto push (T && t) {
auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
// TODO: Check for wraparound and spin while consumers catch up
data[seq & mask] = std::forward<T &&>(t);
published[seq & mask].store(seq >> shift, std::memory_order_release);
return seq;
}
T peek (std::size_t seq) {
while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
return data[seq & mask];
}
private:
std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
T data alignas(CACHE_LINE_SIZE) [length];
std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
};
c++ circular-list c++14 lock-free producer-consumer
$endgroup$
I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:
- Data to be shared between threads is stored in a ring buffer.
- The ring buffer maintains a sequence number that points to the next available slot.
- Consumer threads maintain their own sequence numbers indicating their current read position.
- Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).
- Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.
- Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.
I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?
template <class T, std::size_t n>
class mpmc_ring_buffer {
static constexpr auto length = next_power_of_2(n);
static constexpr auto mask = length - 1;
static constexpr auto shift = log2(length);
public:
ring_buffer () {
for (std::size_t i = 0; i < length; ++i)
published[i] = -1;
}
auto push (T && t) {
auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
// TODO: Check for wraparound and spin while consumers catch up
data[seq & mask] = std::forward<T &&>(t);
published[seq & mask].store(seq >> shift, std::memory_order_release);
return seq;
}
T peek (std::size_t seq) {
while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
return data[seq & mask];
}
private:
std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
T data alignas(CACHE_LINE_SIZE) [length];
std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
};
c++ circular-list c++14 lock-free producer-consumer
c++ circular-list c++14 lock-free producer-consumer
edited Mar 26 '15 at 10:40
linguamachina
asked Mar 25 '15 at 17:38
linguamachinalinguamachina
21518
21518
This question has an open bounty worth +50
reputation from alex ending in 7 days.
This question has not received enough attention.
This question has an open bounty worth +50
reputation from alex ending in 7 days.
This question has not received enough attention.
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f85000%2fdisruptor-pattern-in-a-lock-free-producer-consumer-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f85000%2fdisruptor-pattern-in-a-lock-free-producer-consumer-queue%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown