Disruptor pattern in a lock-free producer-consumer queue












5












$begingroup$


I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:




  • Data to be shared between threads is stored in a ring buffer.

  • The ring buffer maintains a sequence number that points to the next available slot.

  • Consumer threads maintain their own sequence numbers indicating their current read position.

  • Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).

  • Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.

  • Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.


I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?



template <class T, std::size_t n>
class mpmc_ring_buffer {
static constexpr auto length = next_power_of_2(n);
static constexpr auto mask = length - 1;
static constexpr auto shift = log2(length);

public:
ring_buffer () {
for (std::size_t i = 0; i < length; ++i)
published[i] = -1;
}

auto push (T && t) {
auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
// TODO: Check for wraparound and spin while consumers catch up
data[seq & mask] = std::forward<T &&>(t);
published[seq & mask].store(seq >> shift, std::memory_order_release);
return seq;
}

T peek (std::size_t seq) {
while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
return data[seq & mask];
}

private:
std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
T data alignas(CACHE_LINE_SIZE) [length];
std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
};









share|improve this question











$endgroup$





This question has an open bounty worth +50
reputation from alex ending in 7 days.


This question has not received enough attention.





















    5












    $begingroup$


    I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:




    • Data to be shared between threads is stored in a ring buffer.

    • The ring buffer maintains a sequence number that points to the next available slot.

    • Consumer threads maintain their own sequence numbers indicating their current read position.

    • Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).

    • Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.

    • Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.


    I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?



    template <class T, std::size_t n>
    class mpmc_ring_buffer {
    static constexpr auto length = next_power_of_2(n);
    static constexpr auto mask = length - 1;
    static constexpr auto shift = log2(length);

    public:
    ring_buffer () {
    for (std::size_t i = 0; i < length; ++i)
    published[i] = -1;
    }

    auto push (T && t) {
    auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
    // TODO: Check for wraparound and spin while consumers catch up
    data[seq & mask] = std::forward<T &&>(t);
    published[seq & mask].store(seq >> shift, std::memory_order_release);
    return seq;
    }

    T peek (std::size_t seq) {
    while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
    return data[seq & mask];
    }

    private:
    std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
    T data alignas(CACHE_LINE_SIZE) [length];
    std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
    };









    share|improve this question











    $endgroup$





    This question has an open bounty worth +50
    reputation from alex ending in 7 days.


    This question has not received enough attention.



















      5












      5








      5


      4



      $begingroup$


      I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:




      • Data to be shared between threads is stored in a ring buffer.

      • The ring buffer maintains a sequence number that points to the next available slot.

      • Consumer threads maintain their own sequence numbers indicating their current read position.

      • Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).

      • Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.

      • Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.


      I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?



      template <class T, std::size_t n>
      class mpmc_ring_buffer {
      static constexpr auto length = next_power_of_2(n);
      static constexpr auto mask = length - 1;
      static constexpr auto shift = log2(length);

      public:
      ring_buffer () {
      for (std::size_t i = 0; i < length; ++i)
      published[i] = -1;
      }

      auto push (T && t) {
      auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
      // TODO: Check for wraparound and spin while consumers catch up
      data[seq & mask] = std::forward<T &&>(t);
      published[seq & mask].store(seq >> shift, std::memory_order_release);
      return seq;
      }

      T peek (std::size_t seq) {
      while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
      return data[seq & mask];
      }

      private:
      std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
      T data alignas(CACHE_LINE_SIZE) [length];
      std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
      };









      share|improve this question











      $endgroup$




      I'm implementing a quick-and-dirty lock-free MPMC queue in C++, based on the v3 Disruptor algorithm. The implementation is pretty simple:




      • Data to be shared between threads is stored in a ring buffer.

      • The ring buffer maintains a sequence number that points to the next available slot.

      • Consumer threads maintain their own sequence numbers indicating their current read position.

      • Producer threads claim a slot in the buffer by atomically incrementing its sequence number. To avoid overwriting data that is yet to be read, the producers block until every consumer has caught up (implementation omitted for brevity).

      • Once a producer has finished writing to its slot, it stores the current queue generation into a second ring buffer to indicate that the data has been published.

      • Consumers block until the published buffer contains the expected generation count, indicating that data is ready to be used.


      I'm a bit fuzzy on memory barriers, and wanted to check that I'm using them appropriately. Writes to the data buffer must be completed before corresponding writes to the published buffer. I use a release barrier when writing to the published buffer, and an acquire barrier when reading from it. Is this correct?



      template <class T, std::size_t n>
      class mpmc_ring_buffer {
      static constexpr auto length = next_power_of_2(n);
      static constexpr auto mask = length - 1;
      static constexpr auto shift = log2(length);

      public:
      ring_buffer () {
      for (std::size_t i = 0; i < length; ++i)
      published[i] = -1;
      }

      auto push (T && t) {
      auto seq = sequence.fetch_add(1, std::memory_order_relaxed);
      // TODO: Check for wraparound and spin while consumers catch up
      data[seq & mask] = std::forward<T &&>(t);
      published[seq & mask].store(seq >> shift, std::memory_order_release);
      return seq;
      }

      T peek (std::size_t seq) {
      while (published[seq & mask].load(std::memory_order_acquire) != static_cast<std::uint_fast8_t>(seq >> shift));
      return data[seq & mask];
      }

      private:
      std::atomic_size_t sequence alignas(CACHE_LINE_SIZE) {};
      T data alignas(CACHE_LINE_SIZE) [length];
      std::atomic_uint_fast8_t published alignas(CACHE_LINE_SIZE) [length];
      };






      c++ circular-list c++14 lock-free producer-consumer






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 26 '15 at 10:40







      linguamachina

















      asked Mar 25 '15 at 17:38









      linguamachinalinguamachina

      21518




      21518






      This question has an open bounty worth +50
      reputation from alex ending in 7 days.


      This question has not received enough attention.








      This question has an open bounty worth +50
      reputation from alex ending in 7 days.


      This question has not received enough attention.
























          0






          active

          oldest

          votes











          Your Answer





          StackExchange.ifUsing("editor", function () {
          return StackExchange.using("mathjaxEditing", function () {
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          });
          });
          }, "mathjax-editing");

          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "196"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f85000%2fdisruptor-pattern-in-a-lock-free-producer-consumer-queue%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Code Review Stack Exchange!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          Use MathJax to format equations. MathJax reference.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f85000%2fdisruptor-pattern-in-a-lock-free-producer-consumer-queue%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          How to make a Squid Proxy server?

          第一次世界大戦

          Touch on Surface Book