Reading shards from a Kinesis stream












2














Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.



Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.



The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc) or wait for at least one routine to write to errc (return an error).



If all routines terminate without writing to errc, the WaitGroup sync lock is released, closing the errc and done channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc, we force all routines to terminate by calling the underlying context's cancel function and wait for done channel to close.



func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
if err != nil {
return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
}

errc := make(chan error, 1)
done := make(chan error, 1)
wg := sync.WaitGroup{}
wg.Add(len(ids))
for _, id := range ids {
seqNum := checkpoints[id]

ctx, cancel := context.WithCancel(context.Background())
c.cancelFuncs = append(c.cancelFuncs, cancel)
go func(ctx context.Context, shardID, startSeqNum string) {
defer wg.Done()
if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
}
}(ctx, id, seqNum)
}
go func() {
wg.Wait()
close(errc)
close(done)
}()
err = <-errc
if err != nil {
// Cancel all scans, to release the worker goroutines from
// the pool.
for _, cancel := range c.cancelFuncs {
cancel()
}
}
<-done // Wait for all goroutines to exit.
return err
}









share|improve this question





























    2














    Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.



    Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.



    The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc) or wait for at least one routine to write to errc (return an error).



    If all routines terminate without writing to errc, the WaitGroup sync lock is released, closing the errc and done channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc, we force all routines to terminate by calling the underlying context's cancel function and wait for done channel to close.



    func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
    checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
    if err != nil {
    return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
    }

    errc := make(chan error, 1)
    done := make(chan error, 1)
    wg := sync.WaitGroup{}
    wg.Add(len(ids))
    for _, id := range ids {
    seqNum := checkpoints[id]

    ctx, cancel := context.WithCancel(context.Background())
    c.cancelFuncs = append(c.cancelFuncs, cancel)
    go func(ctx context.Context, shardID, startSeqNum string) {
    defer wg.Done()
    if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
    errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
    }
    }(ctx, id, seqNum)
    }
    go func() {
    wg.Wait()
    close(errc)
    close(done)
    }()
    err = <-errc
    if err != nil {
    // Cancel all scans, to release the worker goroutines from
    // the pool.
    for _, cancel := range c.cancelFuncs {
    cancel()
    }
    }
    <-done // Wait for all goroutines to exit.
    return err
    }









    share|improve this question



























      2












      2








      2







      Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.



      Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.



      The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc) or wait for at least one routine to write to errc (return an error).



      If all routines terminate without writing to errc, the WaitGroup sync lock is released, closing the errc and done channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc, we force all routines to terminate by calling the underlying context's cancel function and wait for done channel to close.



      func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
      checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
      if err != nil {
      return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
      }

      errc := make(chan error, 1)
      done := make(chan error, 1)
      wg := sync.WaitGroup{}
      wg.Add(len(ids))
      for _, id := range ids {
      seqNum := checkpoints[id]

      ctx, cancel := context.WithCancel(context.Background())
      c.cancelFuncs = append(c.cancelFuncs, cancel)
      go func(ctx context.Context, shardID, startSeqNum string) {
      defer wg.Done()
      if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
      errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
      }
      }(ctx, id, seqNum)
      }
      go func() {
      wg.Wait()
      close(errc)
      close(done)
      }()
      err = <-errc
      if err != nil {
      // Cancel all scans, to release the worker goroutines from
      // the pool.
      for _, cancel := range c.cancelFuncs {
      cancel()
      }
      }
      <-done // Wait for all goroutines to exit.
      return err
      }









      share|improve this question















      Here is part of my implementation of reading a Kinesis stream. I am not confident that this is the best way to implement synchronizing returns from goroutine.



      Assuming there are N shards in the stream, the code spawns goroutines for each shard in the stream for processing. Goroutine-N works indefinitely until the underlying context is canceled or until an error is encountered during processing.



      The code below is supposed to wait for all N routines to terminate successfully (i.e without any of them writing to errc) or wait for at least one routine to write to errc (return an error).



      If all routines terminate without writing to errc, the WaitGroup sync lock is released, closing the errc and done channels. Which would, in turn, resume the current thread. However, if one of the routines terminate writing to errc, we force all routines to terminate by calling the underlying context's cancel function and wait for done channel to close.



      func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {
      checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
      if err != nil {
      return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
      }

      errc := make(chan error, 1)
      done := make(chan error, 1)
      wg := sync.WaitGroup{}
      wg.Add(len(ids))
      for _, id := range ids {
      seqNum := checkpoints[id]

      ctx, cancel := context.WithCancel(context.Background())
      c.cancelFuncs = append(c.cancelFuncs, cancel)
      go func(ctx context.Context, shardID, startSeqNum string) {
      defer wg.Done()
      if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
      errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
      }
      }(ctx, id, seqNum)
      }
      go func() {
      wg.Wait()
      close(errc)
      close(done)
      }()
      err = <-errc
      if err != nil {
      // Cancel all scans, to release the worker goroutines from
      // the pool.
      for _, cancel := range c.cancelFuncs {
      cancel()
      }
      }
      <-done // Wait for all goroutines to exit.
      return err
      }






      go concurrency






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited yesterday









      200_success

      128k15152413




      128k15152413










      asked Jan 1 at 18:06









      StatesStates

      1111




      1111






















          1 Answer
          1






          active

          oldest

          votes


















          0














          Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:



          func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {


          Alright, so we have a *Consumer (exported type), with a non-exported scanShards function. That's fine. The thing I'm slightly baffled by is why the Collector is passed as an argument here. In AWS Kinesis terms, a Collector is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:



          type Consumer struct {
          collector Collector // Rename needed, though
          cancelFuncs context.CancelFunc // more on this later!
          }


          Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:



          func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {


          The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:



              checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
          if err != nil {
          return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
          }


          Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer object and call something like consumer.GetShards(), because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.

          I've looked at what comes next, but I'll be returning to the checkpoints variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:



          return errors.Wrap(err, "error retrieving stored checkpoints for shards")


          Replace the string constant with an exported constant like so



          const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
          // and use:
          return errors.Wrap(err, ErrRetrievingCheckpoints)


          And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.



          OK, let's move on to the channel stuff now:



              errc := make(chan error, 1)
          done := make(chan error, 1)
          wg := sync.WaitGroup{}
          wg.Add(len(ids))


          Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done channel entirely. Having said that, channels like done are generally defined as done := make(chan struct{}): no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.



          The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!



          What your code does next, is cancel all routines (sort of), and wait for the wg.Wait() call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:



          errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough



          Of course, we're assuming ids is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0 and return early.



          Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:



              for _, id := range ids {
          seqNum := checkpoints[id]


          Yes, I have an issue with this already. checkpoints clearly is a var of type map[string]string. I know this because ids is string, and the seqNum variable is passed to a goroutine with the type string. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:



          for id, seqNum := range checkpoints {


          That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:



          seqNum, ok := checkpoints[id]
          if !ok {
          return errors.New("checkpoint missing")
          }


          Nevermind the risk of someone changing the GetSequenceNumberForShards function to not return an error if not all id's were found! Suppose your ids var holds {"foo", "bar", "zar", "car"}, but the checkpoints map only was returned without the zar key...



          For the same reason, you really should replace wg.Add(len(ids)) with wg.Add(len(checkpoints))



          Right, onwards, let's get stuck in with the context:



                  ctx, cancel := context.WithCancel(context.Background())
          c.cancelFuncs = append(c.cancelFuncs, cancel)


          Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:



          go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
          go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})


          Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:



          func (c *Consumer) Reset() {
          // restoring some fields
          c.cancelFuncs = context.CancelFunc{}
          // more fields here
          }


          These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.



          OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel, or context.WithTimeout, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):



          rctx, cfunc := context.WithCancel(ctx)


          If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc, and all your routines will receive the cancellation signal (<-ctx.Done()). The caller is unaffected.



          So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):



          func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
          if len(ids) == 0 {
          return ErrNoIdsProvided
          }
          checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
          if err != nil {
          return errors.Wrap(err, ErrRetrievingCheckpoints)
          }
          if len(checkpoints) == 0 {
          return ErrNothingToCheck // something like this, should be handled properly
          }
          // note ids is irrelevant, checkpoints is the source of truth now
          errCh := make(chan error, len(checkpoints) - 1)
          // we'll get rid of this, but I've not explained how, so it's here still:
          done := make(chan struct{})
          wg := sync.WaitGroup{}
          wg.Add(len(checkpoints))
          // wrap around ctx argument once!
          rctx, cfunc := context.WithCancel(ctx)

          for id, seqNum := range checkpoints {
          go func(ctx context.Context, shardID, startSeqNum string) {
          defer wg.Done()
          if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
          errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
          }
          }(rctx, id, seqNum)
          }


          OK, that's where we are now. Because the rctx is not in the for scope, and is shared by all routines, you can simply write the routine like so:



          go func(id, seqNum string) {
          defer wg.Done()
          // note: rctx, not ctx!
          if err := c.scanShard(rctx, id, seqNum); err != nil {
          errCh <- err
          }
          }(id, seqNum)


          The routine centers around c.scanShard, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func() bit simply with this:



          go c.scanShard(rctx, &wg, id, seqNum, errCh)
          // given scanShard looks like this:
          func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)


          Anyway, the rest of your code is about tying things together:



              go func() {
          wg.Wait()
          close(errc)
          close(done)
          }()


          So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done channel here, though is what actually allows your function to return. But does it add any value?



              err = <-errc
          if err != nil {
          for _, cancel := range c.cancelFuncs {
          cancel()
          }
          }
          <-done // Wait for all goroutines to exit.
          return err
          }


          And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done channel, completely pointlessly. Even with your code as it stands, the done channel is completely redundant, the error channel already does the same thing.



          Get rid of done, even with your current code, it serves no purpose whatsoever.





          Alternative approach



          I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select statements, and using the ctx.Done() to stop calling the scanShard function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done and select to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:



          package main

          import (
          "context"
          "fmt"
          "sync"
          )

          func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
          checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
          if err != nil {
          // zerolog
          c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
          return err
          }

          errc := make(chan error, 1)
          defer close(errc)
          wg := sync.WaitGroup{}
          wg.Add(len(checkpoints))
          rctx, cfunc := context.WithCancel(ctx)
          for id, seqNum := range checkpoints {
          go func(shardID, startSeqNum string) {
          defer wg.Done()
          select {
          case <-rctx.Done():
          // context cancelled, no need to do anything
          return
          default:
          if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
          c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
          errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
          }
          }
          }(id, seqNum)
          }
          go func() {
          // wg cleared -> cancel context
          wg.Wait()
          cfunc()
          }()
          select {
          case <-ctx.Done():
          // ctx was closed after waitgroup, so nothing to do here
          return nil
          case err := <-errc:
          // error -> cancel routines
          cfunc()
          return err // return, this will close channel
          }
          }





          share|improve this answer





















            Your Answer





            StackExchange.ifUsing("editor", function () {
            return StackExchange.using("mathjaxEditing", function () {
            StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
            StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
            });
            });
            }, "mathjax-editing");

            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "196"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: false,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: null,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210698%2freading-shards-from-a-kinesis-stream%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:



            func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {


            Alright, so we have a *Consumer (exported type), with a non-exported scanShards function. That's fine. The thing I'm slightly baffled by is why the Collector is passed as an argument here. In AWS Kinesis terms, a Collector is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:



            type Consumer struct {
            collector Collector // Rename needed, though
            cancelFuncs context.CancelFunc // more on this later!
            }


            Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:



            func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {


            The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:



                checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
            if err != nil {
            return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
            }


            Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer object and call something like consumer.GetShards(), because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.

            I've looked at what comes next, but I'll be returning to the checkpoints variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:



            return errors.Wrap(err, "error retrieving stored checkpoints for shards")


            Replace the string constant with an exported constant like so



            const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
            // and use:
            return errors.Wrap(err, ErrRetrievingCheckpoints)


            And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.



            OK, let's move on to the channel stuff now:



                errc := make(chan error, 1)
            done := make(chan error, 1)
            wg := sync.WaitGroup{}
            wg.Add(len(ids))


            Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done channel entirely. Having said that, channels like done are generally defined as done := make(chan struct{}): no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.



            The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!



            What your code does next, is cancel all routines (sort of), and wait for the wg.Wait() call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:



            errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough



            Of course, we're assuming ids is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0 and return early.



            Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:



                for _, id := range ids {
            seqNum := checkpoints[id]


            Yes, I have an issue with this already. checkpoints clearly is a var of type map[string]string. I know this because ids is string, and the seqNum variable is passed to a goroutine with the type string. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:



            for id, seqNum := range checkpoints {


            That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:



            seqNum, ok := checkpoints[id]
            if !ok {
            return errors.New("checkpoint missing")
            }


            Nevermind the risk of someone changing the GetSequenceNumberForShards function to not return an error if not all id's were found! Suppose your ids var holds {"foo", "bar", "zar", "car"}, but the checkpoints map only was returned without the zar key...



            For the same reason, you really should replace wg.Add(len(ids)) with wg.Add(len(checkpoints))



            Right, onwards, let's get stuck in with the context:



                    ctx, cancel := context.WithCancel(context.Background())
            c.cancelFuncs = append(c.cancelFuncs, cancel)


            Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:



            go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
            go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})


            Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:



            func (c *Consumer) Reset() {
            // restoring some fields
            c.cancelFuncs = context.CancelFunc{}
            // more fields here
            }


            These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.



            OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel, or context.WithTimeout, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):



            rctx, cfunc := context.WithCancel(ctx)


            If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc, and all your routines will receive the cancellation signal (<-ctx.Done()). The caller is unaffected.



            So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):



            func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
            if len(ids) == 0 {
            return ErrNoIdsProvided
            }
            checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
            if err != nil {
            return errors.Wrap(err, ErrRetrievingCheckpoints)
            }
            if len(checkpoints) == 0 {
            return ErrNothingToCheck // something like this, should be handled properly
            }
            // note ids is irrelevant, checkpoints is the source of truth now
            errCh := make(chan error, len(checkpoints) - 1)
            // we'll get rid of this, but I've not explained how, so it's here still:
            done := make(chan struct{})
            wg := sync.WaitGroup{}
            wg.Add(len(checkpoints))
            // wrap around ctx argument once!
            rctx, cfunc := context.WithCancel(ctx)

            for id, seqNum := range checkpoints {
            go func(ctx context.Context, shardID, startSeqNum string) {
            defer wg.Done()
            if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
            errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
            }
            }(rctx, id, seqNum)
            }


            OK, that's where we are now. Because the rctx is not in the for scope, and is shared by all routines, you can simply write the routine like so:



            go func(id, seqNum string) {
            defer wg.Done()
            // note: rctx, not ctx!
            if err := c.scanShard(rctx, id, seqNum); err != nil {
            errCh <- err
            }
            }(id, seqNum)


            The routine centers around c.scanShard, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func() bit simply with this:



            go c.scanShard(rctx, &wg, id, seqNum, errCh)
            // given scanShard looks like this:
            func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)


            Anyway, the rest of your code is about tying things together:



                go func() {
            wg.Wait()
            close(errc)
            close(done)
            }()


            So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done channel here, though is what actually allows your function to return. But does it add any value?



                err = <-errc
            if err != nil {
            for _, cancel := range c.cancelFuncs {
            cancel()
            }
            }
            <-done // Wait for all goroutines to exit.
            return err
            }


            And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done channel, completely pointlessly. Even with your code as it stands, the done channel is completely redundant, the error channel already does the same thing.



            Get rid of done, even with your current code, it serves no purpose whatsoever.





            Alternative approach



            I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select statements, and using the ctx.Done() to stop calling the scanShard function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done and select to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:



            package main

            import (
            "context"
            "fmt"
            "sync"
            )

            func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
            checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
            if err != nil {
            // zerolog
            c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
            return err
            }

            errc := make(chan error, 1)
            defer close(errc)
            wg := sync.WaitGroup{}
            wg.Add(len(checkpoints))
            rctx, cfunc := context.WithCancel(ctx)
            for id, seqNum := range checkpoints {
            go func(shardID, startSeqNum string) {
            defer wg.Done()
            select {
            case <-rctx.Done():
            // context cancelled, no need to do anything
            return
            default:
            if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
            c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
            errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
            }
            }
            }(id, seqNum)
            }
            go func() {
            // wg cleared -> cancel context
            wg.Wait()
            cfunc()
            }()
            select {
            case <-ctx.Done():
            // ctx was closed after waitgroup, so nothing to do here
            return nil
            case err := <-errc:
            // error -> cancel routines
            cfunc()
            return err // return, this will close channel
            }
            }





            share|improve this answer


























              0














              Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:



              func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {


              Alright, so we have a *Consumer (exported type), with a non-exported scanShards function. That's fine. The thing I'm slightly baffled by is why the Collector is passed as an argument here. In AWS Kinesis terms, a Collector is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:



              type Consumer struct {
              collector Collector // Rename needed, though
              cancelFuncs context.CancelFunc // more on this later!
              }


              Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:



              func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {


              The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:



                  checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
              if err != nil {
              return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
              }


              Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer object and call something like consumer.GetShards(), because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.

              I've looked at what comes next, but I'll be returning to the checkpoints variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:



              return errors.Wrap(err, "error retrieving stored checkpoints for shards")


              Replace the string constant with an exported constant like so



              const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
              // and use:
              return errors.Wrap(err, ErrRetrievingCheckpoints)


              And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.



              OK, let's move on to the channel stuff now:



                  errc := make(chan error, 1)
              done := make(chan error, 1)
              wg := sync.WaitGroup{}
              wg.Add(len(ids))


              Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done channel entirely. Having said that, channels like done are generally defined as done := make(chan struct{}): no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.



              The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!



              What your code does next, is cancel all routines (sort of), and wait for the wg.Wait() call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:



              errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough



              Of course, we're assuming ids is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0 and return early.



              Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:



                  for _, id := range ids {
              seqNum := checkpoints[id]


              Yes, I have an issue with this already. checkpoints clearly is a var of type map[string]string. I know this because ids is string, and the seqNum variable is passed to a goroutine with the type string. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:



              for id, seqNum := range checkpoints {


              That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:



              seqNum, ok := checkpoints[id]
              if !ok {
              return errors.New("checkpoint missing")
              }


              Nevermind the risk of someone changing the GetSequenceNumberForShards function to not return an error if not all id's were found! Suppose your ids var holds {"foo", "bar", "zar", "car"}, but the checkpoints map only was returned without the zar key...



              For the same reason, you really should replace wg.Add(len(ids)) with wg.Add(len(checkpoints))



              Right, onwards, let's get stuck in with the context:



                      ctx, cancel := context.WithCancel(context.Background())
              c.cancelFuncs = append(c.cancelFuncs, cancel)


              Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:



              go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
              go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})


              Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:



              func (c *Consumer) Reset() {
              // restoring some fields
              c.cancelFuncs = context.CancelFunc{}
              // more fields here
              }


              These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.



              OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel, or context.WithTimeout, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):



              rctx, cfunc := context.WithCancel(ctx)


              If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc, and all your routines will receive the cancellation signal (<-ctx.Done()). The caller is unaffected.



              So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):



              func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
              if len(ids) == 0 {
              return ErrNoIdsProvided
              }
              checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
              if err != nil {
              return errors.Wrap(err, ErrRetrievingCheckpoints)
              }
              if len(checkpoints) == 0 {
              return ErrNothingToCheck // something like this, should be handled properly
              }
              // note ids is irrelevant, checkpoints is the source of truth now
              errCh := make(chan error, len(checkpoints) - 1)
              // we'll get rid of this, but I've not explained how, so it's here still:
              done := make(chan struct{})
              wg := sync.WaitGroup{}
              wg.Add(len(checkpoints))
              // wrap around ctx argument once!
              rctx, cfunc := context.WithCancel(ctx)

              for id, seqNum := range checkpoints {
              go func(ctx context.Context, shardID, startSeqNum string) {
              defer wg.Done()
              if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
              errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
              }
              }(rctx, id, seqNum)
              }


              OK, that's where we are now. Because the rctx is not in the for scope, and is shared by all routines, you can simply write the routine like so:



              go func(id, seqNum string) {
              defer wg.Done()
              // note: rctx, not ctx!
              if err := c.scanShard(rctx, id, seqNum); err != nil {
              errCh <- err
              }
              }(id, seqNum)


              The routine centers around c.scanShard, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func() bit simply with this:



              go c.scanShard(rctx, &wg, id, seqNum, errCh)
              // given scanShard looks like this:
              func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)


              Anyway, the rest of your code is about tying things together:



                  go func() {
              wg.Wait()
              close(errc)
              close(done)
              }()


              So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done channel here, though is what actually allows your function to return. But does it add any value?



                  err = <-errc
              if err != nil {
              for _, cancel := range c.cancelFuncs {
              cancel()
              }
              }
              <-done // Wait for all goroutines to exit.
              return err
              }


              And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done channel, completely pointlessly. Even with your code as it stands, the done channel is completely redundant, the error channel already does the same thing.



              Get rid of done, even with your current code, it serves no purpose whatsoever.





              Alternative approach



              I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select statements, and using the ctx.Done() to stop calling the scanShard function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done and select to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:



              package main

              import (
              "context"
              "fmt"
              "sync"
              )

              func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
              checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
              if err != nil {
              // zerolog
              c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
              return err
              }

              errc := make(chan error, 1)
              defer close(errc)
              wg := sync.WaitGroup{}
              wg.Add(len(checkpoints))
              rctx, cfunc := context.WithCancel(ctx)
              for id, seqNum := range checkpoints {
              go func(shardID, startSeqNum string) {
              defer wg.Done()
              select {
              case <-rctx.Done():
              // context cancelled, no need to do anything
              return
              default:
              if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
              c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
              errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
              }
              }
              }(id, seqNum)
              }
              go func() {
              // wg cleared -> cancel context
              wg.Wait()
              cfunc()
              }()
              select {
              case <-ctx.Done():
              // ctx was closed after waitgroup, so nothing to do here
              return nil
              case err := <-errc:
              // error -> cancel routines
              cfunc()
              return err // return, this will close channel
              }
              }





              share|improve this answer
























                0












                0








                0






                Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:



                func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {


                Alright, so we have a *Consumer (exported type), with a non-exported scanShards function. That's fine. The thing I'm slightly baffled by is why the Collector is passed as an argument here. In AWS Kinesis terms, a Collector is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:



                type Consumer struct {
                collector Collector // Rename needed, though
                cancelFuncs context.CancelFunc // more on this later!
                }


                Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:



                func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {


                The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:



                    checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
                if err != nil {
                return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
                }


                Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer object and call something like consumer.GetShards(), because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.

                I've looked at what comes next, but I'll be returning to the checkpoints variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:



                return errors.Wrap(err, "error retrieving stored checkpoints for shards")


                Replace the string constant with an exported constant like so



                const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
                // and use:
                return errors.Wrap(err, ErrRetrievingCheckpoints)


                And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.



                OK, let's move on to the channel stuff now:



                    errc := make(chan error, 1)
                done := make(chan error, 1)
                wg := sync.WaitGroup{}
                wg.Add(len(ids))


                Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done channel entirely. Having said that, channels like done are generally defined as done := make(chan struct{}): no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.



                The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!



                What your code does next, is cancel all routines (sort of), and wait for the wg.Wait() call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:



                errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough



                Of course, we're assuming ids is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0 and return early.



                Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:



                    for _, id := range ids {
                seqNum := checkpoints[id]


                Yes, I have an issue with this already. checkpoints clearly is a var of type map[string]string. I know this because ids is string, and the seqNum variable is passed to a goroutine with the type string. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:



                for id, seqNum := range checkpoints {


                That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:



                seqNum, ok := checkpoints[id]
                if !ok {
                return errors.New("checkpoint missing")
                }


                Nevermind the risk of someone changing the GetSequenceNumberForShards function to not return an error if not all id's were found! Suppose your ids var holds {"foo", "bar", "zar", "car"}, but the checkpoints map only was returned without the zar key...



                For the same reason, you really should replace wg.Add(len(ids)) with wg.Add(len(checkpoints))



                Right, onwards, let's get stuck in with the context:



                        ctx, cancel := context.WithCancel(context.Background())
                c.cancelFuncs = append(c.cancelFuncs, cancel)


                Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:



                go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
                go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})


                Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:



                func (c *Consumer) Reset() {
                // restoring some fields
                c.cancelFuncs = context.CancelFunc{}
                // more fields here
                }


                These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.



                OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel, or context.WithTimeout, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):



                rctx, cfunc := context.WithCancel(ctx)


                If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc, and all your routines will receive the cancellation signal (<-ctx.Done()). The caller is unaffected.



                So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):



                func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
                if len(ids) == 0 {
                return ErrNoIdsProvided
                }
                checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
                if err != nil {
                return errors.Wrap(err, ErrRetrievingCheckpoints)
                }
                if len(checkpoints) == 0 {
                return ErrNothingToCheck // something like this, should be handled properly
                }
                // note ids is irrelevant, checkpoints is the source of truth now
                errCh := make(chan error, len(checkpoints) - 1)
                // we'll get rid of this, but I've not explained how, so it's here still:
                done := make(chan struct{})
                wg := sync.WaitGroup{}
                wg.Add(len(checkpoints))
                // wrap around ctx argument once!
                rctx, cfunc := context.WithCancel(ctx)

                for id, seqNum := range checkpoints {
                go func(ctx context.Context, shardID, startSeqNum string) {
                defer wg.Done()
                if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
                errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
                }
                }(rctx, id, seqNum)
                }


                OK, that's where we are now. Because the rctx is not in the for scope, and is shared by all routines, you can simply write the routine like so:



                go func(id, seqNum string) {
                defer wg.Done()
                // note: rctx, not ctx!
                if err := c.scanShard(rctx, id, seqNum); err != nil {
                errCh <- err
                }
                }(id, seqNum)


                The routine centers around c.scanShard, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func() bit simply with this:



                go c.scanShard(rctx, &wg, id, seqNum, errCh)
                // given scanShard looks like this:
                func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)


                Anyway, the rest of your code is about tying things together:



                    go func() {
                wg.Wait()
                close(errc)
                close(done)
                }()


                So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done channel here, though is what actually allows your function to return. But does it add any value?



                    err = <-errc
                if err != nil {
                for _, cancel := range c.cancelFuncs {
                cancel()
                }
                }
                <-done // Wait for all goroutines to exit.
                return err
                }


                And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done channel, completely pointlessly. Even with your code as it stands, the done channel is completely redundant, the error channel already does the same thing.



                Get rid of done, even with your current code, it serves no purpose whatsoever.





                Alternative approach



                I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select statements, and using the ctx.Done() to stop calling the scanShard function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done and select to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:



                package main

                import (
                "context"
                "fmt"
                "sync"
                )

                func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
                checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
                if err != nil {
                // zerolog
                c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
                return err
                }

                errc := make(chan error, 1)
                defer close(errc)
                wg := sync.WaitGroup{}
                wg.Add(len(checkpoints))
                rctx, cfunc := context.WithCancel(ctx)
                for id, seqNum := range checkpoints {
                go func(shardID, startSeqNum string) {
                defer wg.Done()
                select {
                case <-rctx.Done():
                // context cancelled, no need to do anything
                return
                default:
                if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
                c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
                errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
                }
                }
                }(id, seqNum)
                }
                go func() {
                // wg cleared -> cancel context
                wg.Wait()
                cfunc()
                }()
                select {
                case <-ctx.Done():
                // ctx was closed after waitgroup, so nothing to do here
                return nil
                case err := <-errc:
                // error -> cancel routines
                cfunc()
                return err // return, this will close channel
                }
                }





                share|improve this answer












                Alright, there's a number of issues with the code you posted here. I'll go through it all, and point out issues, step by step:



                func (c *Consumer) scanShards(col Collector, streamName string, ids string) error {


                Alright, so we have a *Consumer (exported type), with a non-exported scanShards function. That's fine. The thing I'm slightly baffled by is why the Collector is passed as an argument here. In AWS Kinesis terms, a Collector is what you use to send stuff to Kinesis in a single request. To me, it's sort of like a buffered writer. This collector, however, seems to provide the consumer with data about the shards that you're about to consume. The collector is actually a core dependency of your consumer, it seems. It'd make sense for the consumer to have the underlying collector assigned to a field:



                type Consumer struct {
                collector Collector // Rename needed, though
                cancelFuncs context.CancelFunc // more on this later!
                }


                Because you're dealing with streams, I'd strongly recommend you use the context package, too (I know you are, but you're not getting the most out of it - see below). Convention dictates that the context argument comes first, so the function would now look like this:



                func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {


                The arguments could probably do with some renaming, but I'm probably the worst person to suggest a good variable name. Anyway, onwards:



                    checkpoints, err := col.GetSequenceNumberForShards(streamName, ids)
                if err != nil {
                return fmt.Errorf("error retrieving stored checkpoints for shards: %v", err)
                }


                Right, so the "collector" has an exported function getting sequence, and you're expecting the collector to be passed in as an argument. This is kind of weird. It'd make much more sense for someone to use a Consumer object and call something like consumer.GetShards(), because that actually tells me something: I know that I'm working with a consumer that is taking its data from a shards X, Y, and Z.

                I've looked at what comes next, but I'll be returning to the checkpoints variable later on. First let me just recommend you look at this package for errors. You're essentially returning a specific error (which is OK), but it's a hard one to detect on the caller-side. The string value of the error still contains all of the raw/initial error value. With the package I linked to, you'd be able to write the same like so:



                return errors.Wrap(err, "error retrieving stored checkpoints for shards")


                Replace the string constant with an exported constant like so



                const ErrRetrievingCheckpoints = "error retrievign stroed checkpoints for shards")
                // and use:
                return errors.Wrap(err, ErrRetrievingCheckpoints)


                And you'll have a much easier time actually detecting what error was returned, without losing the underlying error details.



                OK, let's move on to the channel stuff now:



                    errc := make(chan error, 1)
                done := make(chan error, 1)
                wg := sync.WaitGroup{}
                wg.Add(len(ids))


                Right, So you're creating a waitgroup, and add the length of ids to it, so there's going to be 1 routine per ID. Fine. Then you create an error and done channel, both with a buffer of 1. I'll explain how in a bit, but you can (and should) get rid of the done channel entirely. Having said that, channels like done are generally defined as done := make(chan struct{}): no buffer needed, and an empty struct is defined in the golang specs as a 0-byte type.



                The error channel, however, has a bit of a problem: Suppose I'm going to start 10 routines, and I've got 3 errors. The first routine to err will write its error value onto the channel, and be done with it. The second and third routines to fail will be blocked, waiting for you to read the error from the channel. You have the code to do this, so you'll get that error, and the second routine will now write a new error value to the channel. The third routine is still blocked!



                What your code does next, is cancel all routines (sort of), and wait for the wg.Wait() call to return. Because the third routine is blocked, this will never happen. You've got a deadlock on your hands. The done channel won't get closed, and thus this function won't return, all because your error channel is blocking routines from completing. That's not what we want. There's a quick (and not always ideal) solution to this problem: increase the channel buffer:



                errCh := make(chan error, len(ids) -1) // we'll always read 1, hence -1 is big enough



                Of course, we're assuming ids is not empty. Just in case, it's always a good idea to add some basic checks like if len(ids) == 0 and return early.



                Still, I'll explain later how we fix this without having to increase the channel buffers too much. For now, let's carry on with the line-by-line review:



                    for _, id := range ids {
                seqNum := checkpoints[id]


                Yes, I have an issue with this already. checkpoints clearly is a var of type map[string]string. I know this because ids is string, and the seqNum variable is passed to a goroutine with the type string. Why, then, iterate over a slice, and then perform a lookup in a map to get the corresponding value? Why not simply write:



                for id, seqNum := range checkpoints {


                That's a lot cleaner, and reduces the risk of someone else looking at your code and adding something like this:



                seqNum, ok := checkpoints[id]
                if !ok {
                return errors.New("checkpoint missing")
                }


                Nevermind the risk of someone changing the GetSequenceNumberForShards function to not return an error if not all id's were found! Suppose your ids var holds {"foo", "bar", "zar", "car"}, but the checkpoints map only was returned without the zar key...



                For the same reason, you really should replace wg.Add(len(ids)) with wg.Add(len(checkpoints))



                Right, onwards, let's get stuck in with the context:



                        ctx, cancel := context.WithCancel(context.Background())
                c.cancelFuncs = append(c.cancelFuncs, cancel)


                Why? Why on earth? You're creating a cancel context for each routine, and append the cancel func to a slice without a way to retrieve a specific cancel function. The slice is also stored on the Consumer type. The function uses a pointer receiver, so there's a genuine risk of race conditions here! Suppose I do this:



                go consumer.scanShards(col1, "foo", string{"a", "b", "c"})
                go consumer.scanShards(col2, "bar", string{"foo", "bar", "car", "zar"})


                Both routines are appending to the same slice of cancel functions. They're both calling all of them blindly, too. These routines are messing things up for eachother. And that's just one example, imagine someone coming along and writing this:



                func (c *Consumer) Reset() {
                // restoring some fields
                c.cancelFuncs = context.CancelFunc{}
                // more fields here
                }


                These kind of things can be really hard to debug, and your code is very vulnerable to these issues. You may think that the Reset func I've written here is an unlikely scenario, but given that there's nothing in your code that actually removes the cancel values from the slice, your slice will grow, and grow, and grow, and things will get slowed down.



                OK, I had to rant about these things somewhat, but remember how I said that you ought to have your first argument be a context? Right, do it, it allows the caller to pass in a context.WithCancel, or context.WithTimeout, for example, so the caller can determine whether they want to/need to wait on your function to return. Secondly: why not have all your routines share the same context?. You're just re-wrapping the Background context anyway. Instead of doing that, I'd simply wrap the context from the argument once (outside the loop):



                rctx, cfunc := context.WithCancel(ctx)


                If the context passed as an argument gets cancelled, the cancellation will propagate, if you want to cancel the context, call cfunc, and all your routines will receive the cancellation signal (<-ctx.Done()). The caller is unaffected.



                So with this in mind, let's rewrite the loop a bit (more improvements to follow below, but what we have thusfar):



                func (c *Consumer) scanShards(ctx context.Context, stream string, ids string) error {
                if len(ids) == 0 {
                return ErrNoIdsProvided
                }
                checkpoints, err := c.collector.GetSequenceNumberForShards(stream, ids)
                if err != nil {
                return errors.Wrap(err, ErrRetrievingCheckpoints)
                }
                if len(checkpoints) == 0 {
                return ErrNothingToCheck // something like this, should be handled properly
                }
                // note ids is irrelevant, checkpoints is the source of truth now
                errCh := make(chan error, len(checkpoints) - 1)
                // we'll get rid of this, but I've not explained how, so it's here still:
                done := make(chan struct{})
                wg := sync.WaitGroup{}
                wg.Add(len(checkpoints))
                // wrap around ctx argument once!
                rctx, cfunc := context.WithCancel(ctx)

                for id, seqNum := range checkpoints {
                go func(ctx context.Context, shardID, startSeqNum string) {
                defer wg.Done()
                if err := c.scanShard(ctx, shardID, startSeqNum); err != nil {
                errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
                }
                }(rctx, id, seqNum)
                }


                OK, that's where we are now. Because the rctx is not in the for scope, and is shared by all routines, you can simply write the routine like so:



                go func(id, seqNum string) {
                defer wg.Done()
                // note: rctx, not ctx!
                if err := c.scanShard(rctx, id, seqNum); err != nil {
                errCh <- err
                }
                }(id, seqNum)


                The routine centers around c.scanShard, a function you've not posted, and thus I have no idea what it does (how it uses the context etc...). What is clear to me, though, is that it's an unexported function, and you have control over it. You could just as easily make it into a function that behaves exactly like the routine you wrapped the call in, so you can replace the go func() bit simply with this:



                go c.scanShard(rctx, &wg, id, seqNum, errCh)
                // given scanShard looks like this:
                func (c *Consumer) scanShard(ctx context.Context, wg *sync.WaitGroup, id, seqNum string, errCh chan<- error)


                Anyway, the rest of your code is about tying things together:



                    go func() {
                wg.Wait()
                close(errc)
                close(done)
                }()


                So waiting for the routines to return before closing the channels, which is fair enough, seeing as writing to a closed channel is bad. Closing the done channel here, though is what actually allows your function to return. But does it add any value?



                    err = <-errc
                if err != nil {
                for _, cancel := range c.cancelFuncs {
                cancel()
                }
                }
                <-done // Wait for all goroutines to exit.
                return err
                }


                And this is where things get realy messy: you're first checking the error channel (fair enough), but because you're not sure whether or not you've passed the blocking read (err = <-errCh) because the channel was closed or not, you have to check whether or not you actually have an error. If not, you're still checking the done channel, completely pointlessly. Even with your code as it stands, the done channel is completely redundant, the error channel already does the same thing.



                Get rid of done, even with your current code, it serves no purpose whatsoever.





                Alternative approach



                I've been ranting plenty for now, and I think this is a decent starting point. When I started looking at your code, my mind did jump to select statements, and using the ctx.Done() to stop calling the scanShard function. I'm not sure if it's actually the better way for this particular case, but it might be worth considering. I'll just include a quick write-up of how I'd write this function using ctx.Done and select to control the flow. It's a bit more verbose, and actually looks more complex than it needs to be IMHO, but it's just to show an alternative approach, that in some cases might be worth considering. I have to say, all code in the answer is untested, and just written as I went along, so typo's and bugs are possible:



                package main

                import (
                "context"
                "fmt"
                "sync"
                )

                func (c *Consumer) scanShards(ctx context.Context, streamName string, ids string) error {
                checkpoints, err := c.collector.GetSequenceNumberForShards(streamName, ids)
                if err != nil {
                // zerolog
                c.logger.Error().Err(err).Msg("Error retrieving stored checkpoints for shards")
                return err
                }

                errc := make(chan error, 1)
                defer close(errc)
                wg := sync.WaitGroup{}
                wg.Add(len(checkpoints))
                rctx, cfunc := context.WithCancel(ctx)
                for id, seqNum := range checkpoints {
                go func(shardID, startSeqNum string) {
                defer wg.Done()
                select {
                case <-rctx.Done():
                // context cancelled, no need to do anything
                return
                default:
                if err := c.scanShard(rctx, shardID, startSeqNum); err != nil {
                c.logger.Error().Err(err).Msg("Error in shard %q", shardID)
                errc <- fmt.Errorf("error in shard %q: %v", shardID, err)
                }
                }
                }(id, seqNum)
                }
                go func() {
                // wg cleared -> cancel context
                wg.Wait()
                cfunc()
                }()
                select {
                case <-ctx.Done():
                // ctx was closed after waitgroup, so nothing to do here
                return nil
                case err := <-errc:
                // error -> cancel routines
                cfunc()
                return err // return, this will close channel
                }
                }






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered yesterday









                Elias Van OotegemElias Van Ootegem

                9,0232144




                9,0232144






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Code Review Stack Exchange!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    Use MathJax to format equations. MathJax reference.


                    To learn more, see our tips on writing great answers.





                    Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                    Please pay close attention to the following guidance:


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210698%2freading-shards-from-a-kinesis-stream%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    How to reconfigure Docker Trusted Registry 2.x.x to use CEPH FS mount instead of NFS and other traditional...

                    is 'sed' thread safe

                    How to make a Squid Proxy server?